Skip to content

Commit

Permalink
Integration RDMA module with llm cache. (#1963)
Browse files Browse the repository at this point in the history
Fixes #1942

Signed-off-by: vegetableysm <[email protected]>
  • Loading branch information
vegetableysm authored Aug 9, 2024
1 parent f8eda9c commit 87c8c02
Show file tree
Hide file tree
Showing 20 changed files with 1,469 additions and 35 deletions.
46 changes: 46 additions & 0 deletions modules/llm-cache/ds/kv_cache_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ limitations under the License.
#include "llm-cache/ds/kv_cache_manager.h"
#include "llm-cache/storage/blob_storage.h"
#include "llm-cache/storage/local_file_storage.h"
#include "llm-cache/storage/vineyard_file_storage.h"

namespace vineyard {

Expand Down Expand Up @@ -88,6 +89,33 @@ Status KVCacheManager::Make(std::shared_ptr<KVCacheManager>& manager,
return Status::OK();
}

Status KVCacheManager::Make(RPCClient& rpc_client, Client& ipc_client,
std::shared_ptr<KVCacheManager>& manager,
FileCacheConfig& config) {
if (config.chunkSize <= 0 || config.hashChunkSize <= 0) {
return Status::Invalid("Invalid batch size or split number.");
}
if (config.tensorByte <= 0 || config.cacheCapacity <= 0 ||
config.layer <= 0) {
return Status::Invalid("Invalid tensor byte, cache capacity or layer.");
}

std::shared_ptr<FileStorage> file_storage;
if (config.filesystemType == FilesystemType::VINEYARD) {
file_storage = std::make_shared<VineyardFileStorage>(
rpc_client, ipc_client, config.tensorByte, config.cacheCapacity,
config.layer, config.chunkSize, config.hashChunkSize, config.root,
config.gcInterval, config.ttl, config.enbaleGlobalGC,
config.globalGCInterval, config.globalTTL);
} else {
return Status::Invalid("Unsupported filesystem type");
}
manager = std::make_shared<KVCacheManager>(file_storage);
RETURN_ON_ERROR(file_storage->Init());
manager->config = std::make_shared<FileCacheConfig>(config);
return Status::OK();
}

/**
* @brief Update the kv state with the given token list in the kv state cache
* manager.
Expand Down Expand Up @@ -250,6 +278,17 @@ Status KVCacheManager::Update(
return storage->Update(tokenList, nextToken, kvState);
}

Status KVCacheManager::BatchedUpdate(
const std::vector<int>& tokenList,
const std::vector<std::vector<std::pair<LLMKV, LLMKV>>>& kvCacheList,
size_t& updated) {
if (kvCacheList.size() != tokenList.size()) {
return Status::Invalid("Token list size not match kv state list size");
}

return storage->BatchedUpdate(tokenList, kvCacheList, updated);
}

/**
* @brief Query the kv state with the given token list in the kv state cache
* manager.
Expand Down Expand Up @@ -400,6 +439,13 @@ Status KVCacheManager::Query(
return storage->Query(prefix, tokenList, kvCacheList, matched);
}

Status KVCacheManager::BatchedQuery(
const std::vector<int>& tokenList,
std::vector<std::vector<std::pair<LLMKV, LLMKV>>>& kvCacheList,
size_t& matched) {
return storage->BatchedQuery(tokenList, kvCacheList, matched);
}

Status KVCacheManager::ClearGlobalCache(Client& client,
VineyardCacheConfig& config) {
return BlobStorage::ClearGlobalCache(client, config.llmCacheSyncLock,
Expand Down
14 changes: 14 additions & 0 deletions modules/llm-cache/ds/kv_cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class KVCacheManager {
static Status Make(std::shared_ptr<KVCacheManager>& manager,
FileCacheConfig& config);

static Status Make(RPCClient& rpc_client, Client& ipc_client,
std::shared_ptr<KVCacheManager>& manager,
FileCacheConfig& config);

Status Update(const std::vector<int>& tokenList, int nextToken,
const std::vector<std::pair<LLMKV, LLMKV>>& kvState);

Expand All @@ -54,6 +58,11 @@ class KVCacheManager {
const std::vector<std::vector<std::pair<LLMKV, LLMKV>>>& kvCacheList,
size_t& updated);

Status BatchedUpdate(
const std::vector<int>& tokenList,
const std::vector<std::vector<std::pair<LLMKV, LLMKV>>>& kvCacheList,
size_t& updated);

Status Query(const std::vector<int>& tokenList,
std::vector<std::vector<std::pair<LLMKV, LLMKV>>>& kvCacheList,
size_t& matched);
Expand All @@ -66,6 +75,11 @@ class KVCacheManager {
std::vector<std::vector<std::pair<LLMKV, LLMKV>>>& kvCacheList,
size_t& matched);

Status BatchedQuery(
const std::vector<int>& tokenList,
std::vector<std::vector<std::pair<LLMKV, LLMKV>>>& kvCacheList,
size_t& matched);

void Close();

void StopGlobalGCThread();
Expand Down
Loading

0 comments on commit 87c8c02

Please sign in to comment.