Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

liujp/raw client #93

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
6 changes: 5 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
cmake_minimum_required(VERSION 2.8)
project(kvClient)
set (CMAKE_CXX_STANDARD 17)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing -O2")
#set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing -O0 -g")
set (CMAKE_EXPORT_COMPILE_COMMANDS ON)

message("${CMAKE_CXX_FLAGS}")

enable_testing()

if (NOT gRPC_FOUND)
Expand Down
10 changes: 5 additions & 5 deletions ci/build-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ fi

build_dir="$SRCPATH/build"
mkdir -p $build_dir && cd $build_dir
cmake "$SRCPATH" \
cmake3 "$SRCPATH" \
-DENABLE_TESTS=on
make -j $NPROC

nohup /mock-tikv/bin/mock-tikv &
mock_kv_pid=$!
# nohup /mock-tikv/bin/mock-tikv &
# mock_kv_pid=$!

cd "$build_dir" && make test
# cd "$build_dir" && make test

kill -9 $mock_kv_pid
# kill -9 $mock_kv_pid


36 changes: 36 additions & 0 deletions include/pingcap/Histogram.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once
#include <string>

namespace pingcap
{
class Histogram {
public:
Histogram() {}
~Histogram() {}

void Clear();
void Add(double value);
void Merge(const Histogram& other);
std::string ToString() const;
double Median() const;
double Percentile(double p) const;
double Average() const;
double StandardDeviation() const;
double Minimum() const;
double Count() const;
double Maximum() const;

private:
enum { kNumBuckets = 154 };
static const double kBucketLimit[kNumBuckets];

double min_;
double max_;
double num_;
double sum_;
double sum_squares_;

double buckets_[kNumBuckets];
};

} //namespace pingcap
3 changes: 3 additions & 0 deletions include/pingcap/kv/Backoff.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ constexpr int cleanupMaxBackoff = 20000;
constexpr int copBuildTaskMaxBackoff = 5000;
constexpr int copNextMaxBackoff = 20000;
constexpr int pessimisticLockMaxBackoff = 20000;
constexpr int RawGetMaxBackoff = 20000;
constexpr int RawPutMaxBackoff = 20000;
constexpr int RawDeleteMaxBackoff = 20000;

using BackoffPtr = std::shared_ptr<Backoff>;

Expand Down
2 changes: 1 addition & 1 deletion include/pingcap/kv/Cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct Cluster

LockResolverPtr lock_resolver;

Cluster() : pd_client(std::make_shared<pd::MockPDClient>()), rpc_client(std::make_unique<RpcClient>()) {}
// Cluster() : pd_client(std::make_shared<pd::MockPDClient>()), rpc_client(std::make_unique<RpcClient>()) {}

Cluster(const std::vector<std::string> & pd_addrs, const ClusterConfig & config)
: pd_client(std::make_shared<pd::CodecClient>(pd_addrs, config)),
Expand Down
61 changes: 61 additions & 0 deletions include/pingcap/kv/RawClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#pragma once

#include <pingcap/kv/Cluster.h>
#include <pingcap/kv/RegionClient.h>

#include <optional>

namespace pingcap
{
namespace kv
{

enum ColumnFamily: int8_t {
Default = 0,
Lock,
Write,
};
constexpr const char* kCfString[3] = {"default", "lock", "write"};

// raw client imitate from the rust raw client
//https://docs.rs/tikv-client/latest/tikv_client/struct.RawClient.html
struct RawClient
{
ClusterPtr cluster_ptr;
bool for_cas;
ColumnFamily cf;

RawClient(const std::vector<std::string> & pd_addrs);
RawClient(const std::vector<std::string> & pd_addrs, bool cas);
RawClient(const std::vector<std::string> & pd_addrs, const ClusterConfig & config);
RawClient(const std::vector<std::string> & pd_addrs, const ClusterConfig & config, bool cas);
bool IsCASClient();
RawClient& AsCASClient();
RawClient& AsRawClient();
void SetColumnFamily(ColumnFamily cof);
std::string GetColumnFamily();

// without cache method
void Put(const std::string &key, const std::string &value);
void Put(const std::string &key, const std::string &value, uint64_t ttl);
void Put(const std::string &key, const std::string &value, int64_t timeout_ms);
void Put(const std::string &key, const std::string &value, int64_t timeout_ms, uint64_t ttl);
// delete
void Delete(const std::string &key);
void Delete(const std::string &key, int64_t timeout_ms);
uint64_t GetKeyTTL(const std::string &key);
uint64_t GetKeyTTL(const std::string &key, int64_t timeout_ms);
std::optional<std::string> Get(const std::string &key);
std::optional<std::string> Get(const std::string &key, int64_t timeout_ms);
std::optional<std::string> CompareAndSwap(const std::string &key, std::optional<std::string> old_value,
const std::string &new_value, bool &is_swap);
std::optional<std::string> CompareAndSwap(const std::string &key, std::optional<std::string> old_value,
const std::string &new_value, bool &is_swap, int64_t timeout_ms);
std::optional<std::string> CompareAndSwap(const std::string &key, std::optional<std::string> old_value,
const std::string &new_value, bool &is_swap, int64_t timeout_ms, uint64_t ttl);

};

} // namespace kv
} // namespace pingcap

8 changes: 6 additions & 2 deletions include/pingcap/kv/RegionClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
#include <pingcap/kv/Cluster.h>
#include <pingcap/kv/RegionCache.h>
#include <pingcap/kv/Rpc.h>
#include <Poco/AutoPtr.h>
#include <Poco/ConsoleChannel.h>

namespace pingcap
{
namespace kv
{

constexpr int dailTimeout = 5;
constexpr int dailTimeout = 100000;
constexpr int copTimeout = 20;

// RegionClient sends KV/Cop requests to tikv server (corresponding to `RegionRequestSender` in go-client). It handles network errors and some region errors internally.
Expand All @@ -34,7 +36,7 @@ struct RegionClient

// This method send a request to region, but is NOT Thread-Safe !!
template <typename T>
auto sendReqToRegion(Backoffer & bo, std::shared_ptr<T> req, int timeout = dailTimeout, StoreType store_type = StoreType::TiKV)
auto sendReqToRegion(Backoffer & bo, std::shared_ptr<T> req, int64_t timeout = dailTimeout, StoreType store_type = StoreType::TiKV)
{
RpcCall<T> rpc(req);
for (;;)
Expand All @@ -56,6 +58,7 @@ struct RegionClient
}
catch (const Exception & e)
{
log->warning("send rpc excpetion: " + e.displayText());
onSendFail(bo, e, ctx);
continue;
}
Expand All @@ -64,6 +67,7 @@ struct RegionClient
{
log->warning("region " + region_id.toString() + " find error: " + resp->region_error().message());
onRegionError(bo, ctx, resp->region_error());
// set error and return
}
else
{
Expand Down
5 changes: 3 additions & 2 deletions include/pingcap/kv/Rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ class RpcCall

std::shared_ptr<S> getResp() { return resp; }

void call(std::shared_ptr<KvConnClient> client, int timeout)
void call(std::shared_ptr<KvConnClient> client, int64_t timeout)
{
grpc::ClientContext context;
context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(timeout));
// context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(timeout));
context.set_deadline(std::chrono::system_clock::now() + std::chrono::milliseconds(timeout));
auto status = Trait::doRPCCall(&context, client, *req, resp.get());
if (!status.ok())
{
Expand Down
11 changes: 11 additions & 0 deletions include/pingcap/kv/internal/type_traits.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ PINGCAP_DEFINE_TRAITS(kvrpcpb, TxnHeartBeat, KvTxnHeartBeat)
PINGCAP_DEFINE_TRAITS(kvrpcpb, CheckSecondaryLocks, KvCheckSecondaryLocks)
PINGCAP_DEFINE_TRAITS(coprocessor, , Coprocessor)
PINGCAP_DEFINE_TRAITS(mpp, DispatchTask, DispatchMPPTask)
// add raw methods
PINGCAP_DEFINE_TRAITS(kvrpcpb, RawGet, RawGet)
PINGCAP_DEFINE_TRAITS(kvrpcpb, RawBatchGet, RawBatchGet)
PINGCAP_DEFINE_TRAITS(kvrpcpb, RawPut, RawPut)
PINGCAP_DEFINE_TRAITS(kvrpcpb, RawBatchPut, RawBatchPut)
PINGCAP_DEFINE_TRAITS(kvrpcpb, RawDelete, RawDelete)
PINGCAP_DEFINE_TRAITS(kvrpcpb, RawBatchDelete, RawBatchDelete)
PINGCAP_DEFINE_TRAITS(kvrpcpb, RawScan, RawScan)
PINGCAP_DEFINE_TRAITS(kvrpcpb, RawDeleteRange, RawDeleteRange)
PINGCAP_DEFINE_TRAITS(kvrpcpb, RawGetKeyTTL, RawGetKeyTTL)
PINGCAP_DEFINE_TRAITS(kvrpcpb, RawCAS, RawCompareAndSwap)


} // namespace kv
Expand Down
2 changes: 2 additions & 0 deletions include/pingcap/pd/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class Client : public IClient
// only implement a weak get ts.
uint64_t getTS() override;

std::string name() override {return "client";}

std::pair<metapb::Region, metapb::Peer> getRegionByKey(const std::string & key) override;

//std::pair<metapb::Region, metapb::Peer> getPrevRegion(std::string key) override;
Expand Down
4 changes: 4 additions & 0 deletions include/pingcap/pd/CodecClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@ struct CodecClient : public Client
{
CodecClient(const std::vector<std::string> & addrs, const ClusterConfig & config) : Client(addrs, config) {}

std::string name() override {return "codeClient";}

std::pair<metapb::Region, metapb::Peer> getRegionByKey(const std::string & key) override
{
auto [region, leader] = Client::getRegionByKey(encodeBytes(key));
if(!region.has_encryption_meta())
return std::make_pair(region, leader);
return std::make_pair(processRegionResult(region), leader);
}

Expand Down
2 changes: 2 additions & 0 deletions include/pingcap/pd/IClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class IClient
virtual uint64_t getGCSafePoint() = 0;

virtual bool isMock() = 0;

virtual std::string name() {return "base";}
};

using ClientPtr = std::shared_ptr<IClient>;
Expand Down
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ list(APPEND kvClient_sources kv/Scanner.cc)
list(APPEND kvClient_sources pd/Client.cc)
list(APPEND kvClient_sources coprocessor/Client.cc)
list(APPEND kvClient_sources RedactHelpers.cc)
list(APPEND kvClient_sources kv/RawClient.cc)
list(APPEND kvClient_sources Histogram.cc)

set(kvClient_INCLUDE_DIR ${kvClient_SOURCE_DIR}/include)

Expand Down
Loading