Skip to content

Commit

Permalink
Enhancement: Added paritial compression of data in the cache
Browse files Browse the repository at this point in the history
- Values will be serialized and compressed before storing
- Reverse will be applied during a Get call.
  • Loading branch information
s-bose7 committed Jun 21, 2024
1 parent ea6aa71 commit 8a9bdbc
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 55 deletions.
15 changes: 10 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ set(CMAKE_CXX_STANDARD_REQUIRED True)

# Add source and header files
include_directories(include)
# include_directories(${CMAKE_SOURCE_DIR}/utils)
add_library(MemCache STATIC
src/memcache.cpp
src/frequencynode.cpp
Expand All @@ -17,10 +16,10 @@ add_library(MemCache STATIC
)

# Add Snappy
find_package(Snappy REQUIRED)
include_directories(${SNAPPY_INCLUDE_DIRS})
set(SNAPPY_INCLUDE_DIRS "/usr/include")
set(SNAPPY_LIBRARIES "/usr/lib/x86_64-linux-gnu/libsnappy.so")
target_link_libraries(MemCache ${SNAPPY_LIBRARIES})

target_sources(MemCache PRIVATE utils/compression.h)

# Option to use submodule for googletest
option(USE_SUBMODULE_GTEST "Use googletest from submodule" OFF)
Expand All @@ -44,5 +43,11 @@ add_executable(run_tests
tests/test_memcache.cpp
tests/test_thread_safety.cpp
tests/test_atomicity.cpp
tests/test_snapshot.cpp
)
target_link_libraries(run_tests gtest gtest_main pthread MemCache)

if(USE_SUBMODULE_GTEST)
target_link_libraries(run_tests gtest gtest_main pthread MemCache ${SNAPPY_LIBRARIES})
else()
target_link_libraries(run_tests ${GTEST_LIBRARIES} pthread MemCache ${SNAPPY_LIBRARIES})
endif()
4 changes: 0 additions & 4 deletions ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@
# MINOR
- Snapshot creation

# MINOR
- Data compression
- Data serialization?

# MAJOR
- Supporting multiple eviction policies
- Custom eviction policis using comparison func
Expand Down
15 changes: 8 additions & 7 deletions include/memcache.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@
#include <chrono>
#include <thread>
#include <mutex>
#include <atomic>
#include <iostream>
#include <algorithm>
#include <climits>
#include <string>
#include <snappy.h>

#include "mapitem.h"
#include "keynode.h"
#include "frequencynode.h"
#include "../include/mapitem.h"
#include "../include/keynode.h"
#include "../include/frequencynode.h"

#include "../utils/memory_info.h"
#include "../utils/compression.h"

using namespace std;
using namespace chrono;
Expand All @@ -40,7 +41,7 @@ class MemCache {
FrequencyNode<KeyNode<K>> *HEAD;

// To store elements by key, Key to MapItem
unordered_map<K, MapItem<KeyNode<K>, V>> bykey;
unordered_map<K, MapItem<KeyNode<K>, string>> bykey;

// Update the frequency of a particular key
void update_frequency_of_the(K key);
Expand Down Expand Up @@ -81,7 +82,7 @@ class MemCache {

// An overloaded remove.
// To avoid recursive locks, deadlocks etc.
void remove(MapItem<KeyNode<K>, V> item);
void remove(MapItem<KeyNode<K>, string> item);

// Background Job: A separte thread based TTL support with a monotonic clock.
unordered_map<K, steady_clock::time_point> expiration_map;
Expand All @@ -101,7 +102,7 @@ class MemCache {
* get(int key) Gets the value of the key
* if the key exists in the cache. Otherwise, returns default value of V.
*/
K get(K key);
V get(K key);

/*
* put(int key, int value, int ttl = -1)
Expand Down
29 changes: 17 additions & 12 deletions src/memcache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,24 @@ MemCache<K, V>::~MemCache(){


template<typename K, typename V>
K MemCache<K, V>::get(K key) {
V MemCache<K, V>::get(K key) {
lock_guard<mutex> lock(cache_mutex);

if (bykey.count(key) > 0){
MapItem<KeyNode<K>, V> map_item = bykey.at(key);
MapItem<KeyNode<K>, string> map_item = bykey.at(key);
update_frequency_of_the(key);
return map_item.value;
// Return after decompressing the value
return Compressor::deserialize<V>(
Compressor::uncompress(map_item.value), std::is_arithmetic<V>{}
);
}
return V();
}


template<typename K, typename V>
void MemCache<K, V>::update_frequency_of_the(K key){
MapItem<KeyNode<K>, V> map_item = bykey.at(key);
MapItem<KeyNode<K>, string> map_item = bykey.at(key);

FrequencyNode<KeyNode<K>> *cur_freq = map_item.node->parent;
FrequencyNode<KeyNode<K>> *new_freq = cur_freq->next;
Expand Down Expand Up @@ -79,17 +82,19 @@ void MemCache<K, V>::put(K key, V value, unsigned long ttl) {
if(ttl < 1){
ttl = INT_MAX; // Default ttl value
}
// Compress value
string compressed_val = Compressor::compress(
Compressor::serialize(value, std::is_arithmetic<V>{})
);
expiration_map[key] = steady_clock::now() + chrono::seconds(ttl);
if(bykey.count(key) > 0) {
// Cache miss
// Update the value of the key
bykey.at(key).value = value;
bykey.at(key).value = compressed_val;
// Update the frequency of the key
update_frequency_of_the(key);
return;
}
// Cache hit
if(this->curr_size == this->MAX_SIZE){
if(curr_size == MAX_SIZE){
apply_eviction_policy();
}
FrequencyNode<KeyNode<K>> *freq_node = HEAD->next;
Expand All @@ -99,8 +104,8 @@ void MemCache<K, V>::put(K key, V value, unsigned long ttl) {
KeyNode<K> *key_node = new KeyNode<K>(key, freq_node);
put_keynode_in_frequencynode(freq_node, key_node);
// Put a new entry into the Hash Table
bykey.insert(make_pair(key, MapItem<KeyNode<K>, V>(value, key_node)));
++this->curr_size;
bykey.insert(make_pair(key, MapItem<KeyNode<K>, string>(compressed_val, key_node)));
++curr_size;
}


Expand Down Expand Up @@ -191,7 +196,7 @@ void MemCache<K, V>::remove_keynode_from_frequencynode(


template<typename K, typename V>
void MemCache<K, V>::remove(MapItem<KeyNode<K>, V> item){
void MemCache<K, V>::remove(MapItem<KeyNode<K>, string> item){
// Seems redundant at first glance, but nessearly to avoid recursive locking.
K key = item.node->key;
remove_keynode_from_frequencynode(item.node->parent, item.node);
Expand All @@ -206,7 +211,7 @@ bool MemCache<K, V>::remove(K key) {

bool key_removal_status = false;
if(bykey.count(key) > 0){
MapItem<KeyNode<K>, V> map_item = bykey.at(key);
MapItem<KeyNode<K>, string> map_item = bykey.at(key);
remove_keynode_from_frequencynode(map_item.node->parent, map_item.node);
bykey.erase(key);
curr_size -= 1;
Expand Down
10 changes: 5 additions & 5 deletions tests/test_atomicity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <gtest/gtest.h>

// Shared cache
MemCache<string, string> cache(100);
MemCache<string, int> cache(100);


// Test atomicity for put operation
Expand All @@ -17,7 +17,7 @@ TEST(AtomicityTest, AtomicPutOperation) {

for (int i=1; i<=num_threads; i++) {
threads.emplace_back([&]() {
cache.put("key", "value");
cache.put("key", 2606);
++counter;
});
}
Expand All @@ -27,22 +27,22 @@ TEST(AtomicityTest, AtomicPutOperation) {
}

EXPECT_EQ(counter.load(), num_threads);
EXPECT_EQ(cache.get("key"), "value");
EXPECT_EQ(cache.get("key"), 2606);
EXPECT_EQ(cache.size(), 1);
}


// Test atomicity of get operation
TEST(AtomicityTest, AtomicGetOperation) {
cache.put("foo", "bar");
cache.put("foo", 3205);

std::atomic<int> counter(0);
const int num_threads = 100;
std::vector<std::thread> threads;

for (int i = 0; i < num_threads; ++i) {
threads.emplace_back([&]() {
if(cache.get("foo") == "bar") {
if(cache.get("foo") == 3205) {
++counter;
}
});
Expand Down
22 changes: 0 additions & 22 deletions utils/compression.cc

This file was deleted.

55 changes: 55 additions & 0 deletions utils/compression.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#ifndef COMPRESSION_H
#define COMPRESSION_H

#include <string>
#include <snappy.h>
#include <sstream>
#include <cstring>
#include <type_traits>

class Compressor {

public:

template<typename T>
static std::string serialize(const T& data, std::true_type) {
return std::to_string(data);
}

template<typename T>
static T deserialize(const std::string& decompressed, std::true_type) {
std::istringstream iss(decompressed);
T data;
iss >> data;
return data;
}

template<typename T>
static std::string serialize(const T& data, std::false_type) {
// A byte-by-byte copy of the memory representation of the value object.
const char* byte_seq = reinterpret_cast<const char*>(&data);
return std::string(byte_seq, sizeof(T));
}

template<typename T>
static T deserialize(const std::string& decompressed, std::false_type) {
T value;
std::memcpy(&value, decompressed.data(), sizeof(T));
return value;
}

static std::string compress(const std::string& serialized){
std::string compressed;
snappy::Compress(serialized.data(), serialized.size(), &compressed);
return compressed;
}

static std::string uncompress(const std::string& compressed){
std::string decompressed;
snappy::Uncompress(compressed.data(), compressed.size(), &decompressed);
return decompressed;
}

};

#endif

0 comments on commit 8a9bdbc

Please sign in to comment.