diff --git a/CMakeLists.txt b/CMakeLists.txt index 8e533e2a..96b5f0f6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,8 +1,12 @@ cmake_minimum_required(VERSION 2.8) project(kvClient) set (CMAKE_CXX_STANDARD 17) -set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing") +set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing -O2") +#set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing -O0 -g") set (CMAKE_EXPORT_COMPILE_COMMANDS ON) + +message("${CMAKE_CXX_FLAGS}") + enable_testing() if (NOT gRPC_FOUND) diff --git a/ci/build-test.sh b/ci/build-test.sh index 25db3a9f..644b8ab9 100755 --- a/ci/build-test.sh +++ b/ci/build-test.sh @@ -15,15 +15,15 @@ fi build_dir="$SRCPATH/build" mkdir -p $build_dir && cd $build_dir -cmake "$SRCPATH" \ +cmake3 "$SRCPATH" \ -DENABLE_TESTS=on make -j $NPROC -nohup /mock-tikv/bin/mock-tikv & -mock_kv_pid=$! +# nohup /mock-tikv/bin/mock-tikv & +# mock_kv_pid=$! -cd "$build_dir" && make test +# cd "$build_dir" && make test -kill -9 $mock_kv_pid +# kill -9 $mock_kv_pid diff --git a/include/pingcap/Histogram.h b/include/pingcap/Histogram.h new file mode 100644 index 00000000..5bdd6657 --- /dev/null +++ b/include/pingcap/Histogram.h @@ -0,0 +1,36 @@ +#pragma once +#include + +namespace pingcap +{ +class Histogram { + public: + Histogram() {} + ~Histogram() {} + + void Clear(); + void Add(double value); + void Merge(const Histogram& other); + std::string ToString() const; + double Median() const; + double Percentile(double p) const; + double Average() const; + double StandardDeviation() const; + double Minimum() const; + double Count() const; + double Maximum() const; + + private: + enum { kNumBuckets = 154 }; + static const double kBucketLimit[kNumBuckets]; + + double min_; + double max_; + double num_; + double sum_; + double sum_squares_; + + double buckets_[kNumBuckets]; +}; + +} //namespace pingcap \ No newline at end of file diff --git a/include/pingcap/kv/Backoff.h b/include/pingcap/kv/Backoff.h index 6be37b31..06d7e871 100644 --- a/include/pingcap/kv/Backoff.h +++ b/include/pingcap/kv/Backoff.h @@ -90,6 +90,9 @@ constexpr int cleanupMaxBackoff = 20000; constexpr int copBuildTaskMaxBackoff = 5000; constexpr int copNextMaxBackoff = 20000; constexpr int pessimisticLockMaxBackoff = 20000; +constexpr int RawGetMaxBackoff = 20000; +constexpr int RawPutMaxBackoff = 20000; +constexpr int RawDeleteMaxBackoff = 20000; using BackoffPtr = std::shared_ptr; diff --git a/include/pingcap/kv/Cluster.h b/include/pingcap/kv/Cluster.h index 27a1f3ce..6e665fc6 100644 --- a/include/pingcap/kv/Cluster.h +++ b/include/pingcap/kv/Cluster.h @@ -26,7 +26,7 @@ struct Cluster LockResolverPtr lock_resolver; - Cluster() : pd_client(std::make_shared()), rpc_client(std::make_unique()) {} + // Cluster() : pd_client(std::make_shared()), rpc_client(std::make_unique()) {} Cluster(const std::vector & pd_addrs, const ClusterConfig & config) : pd_client(std::make_shared(pd_addrs, config)), diff --git a/include/pingcap/kv/RawClient.h b/include/pingcap/kv/RawClient.h new file mode 100644 index 00000000..5dbbc3b5 --- /dev/null +++ b/include/pingcap/kv/RawClient.h @@ -0,0 +1,61 @@ +#pragma once + +#include +#include + +#include + +namespace pingcap +{ +namespace kv +{ + +enum ColumnFamily: int8_t { + Default = 0, + Lock, + Write, +}; +constexpr const char* kCfString[3] = {"default", "lock", "write"}; + +// raw client imitate from the rust raw client +//https://docs.rs/tikv-client/latest/tikv_client/struct.RawClient.html +struct RawClient +{ + ClusterPtr cluster_ptr; + bool for_cas; + ColumnFamily cf; + + RawClient(const std::vector & pd_addrs); + RawClient(const std::vector & pd_addrs, bool cas); + RawClient(const std::vector & pd_addrs, const ClusterConfig & config); + RawClient(const std::vector & pd_addrs, const ClusterConfig & config, bool cas); + bool IsCASClient(); + RawClient& AsCASClient(); + RawClient& AsRawClient(); + void SetColumnFamily(ColumnFamily cof); + std::string GetColumnFamily(); + + // without cache method + void Put(const std::string &key, const std::string &value); + void Put(const std::string &key, const std::string &value, uint64_t ttl); + void Put(const std::string &key, const std::string &value, int64_t timeout_ms); + void Put(const std::string &key, const std::string &value, int64_t timeout_ms, uint64_t ttl); + // delete + void Delete(const std::string &key); + void Delete(const std::string &key, int64_t timeout_ms); + uint64_t GetKeyTTL(const std::string &key); + uint64_t GetKeyTTL(const std::string &key, int64_t timeout_ms); + std::optional Get(const std::string &key); + std::optional Get(const std::string &key, int64_t timeout_ms); + std::optional CompareAndSwap(const std::string &key, std::optional old_value, + const std::string &new_value, bool &is_swap); + std::optional CompareAndSwap(const std::string &key, std::optional old_value, + const std::string &new_value, bool &is_swap, int64_t timeout_ms); + std::optional CompareAndSwap(const std::string &key, std::optional old_value, + const std::string &new_value, bool &is_swap, int64_t timeout_ms, uint64_t ttl); + +}; + +} // namespace kv +} // namespace pingcap + diff --git a/include/pingcap/kv/RegionClient.h b/include/pingcap/kv/RegionClient.h index 84491d4d..1682be27 100644 --- a/include/pingcap/kv/RegionClient.h +++ b/include/pingcap/kv/RegionClient.h @@ -4,13 +4,15 @@ #include #include #include +#include +#include namespace pingcap { namespace kv { -constexpr int dailTimeout = 5; +constexpr int dailTimeout = 100000; constexpr int copTimeout = 20; // RegionClient sends KV/Cop requests to tikv server (corresponding to `RegionRequestSender` in go-client). It handles network errors and some region errors internally. @@ -34,7 +36,7 @@ struct RegionClient // This method send a request to region, but is NOT Thread-Safe !! template - auto sendReqToRegion(Backoffer & bo, std::shared_ptr req, int timeout = dailTimeout, StoreType store_type = StoreType::TiKV) + auto sendReqToRegion(Backoffer & bo, std::shared_ptr req, int64_t timeout = dailTimeout, StoreType store_type = StoreType::TiKV) { RpcCall rpc(req); for (;;) @@ -56,6 +58,7 @@ struct RegionClient } catch (const Exception & e) { + log->warning("send rpc excpetion: " + e.displayText()); onSendFail(bo, e, ctx); continue; } @@ -64,6 +67,7 @@ struct RegionClient { log->warning("region " + region_id.toString() + " find error: " + resp->region_error().message()); onRegionError(bo, ctx, resp->region_error()); + // set error and return } else { diff --git a/include/pingcap/kv/Rpc.h b/include/pingcap/kv/Rpc.h index acee8318..27030d25 100644 --- a/include/pingcap/kv/Rpc.h +++ b/include/pingcap/kv/Rpc.h @@ -56,10 +56,11 @@ class RpcCall std::shared_ptr getResp() { return resp; } - void call(std::shared_ptr client, int timeout) + void call(std::shared_ptr client, int64_t timeout) { grpc::ClientContext context; - context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(timeout)); + // context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(timeout)); + context.set_deadline(std::chrono::system_clock::now() + std::chrono::milliseconds(timeout)); auto status = Trait::doRPCCall(&context, client, *req, resp.get()); if (!status.ok()) { diff --git a/include/pingcap/kv/internal/type_traits.h b/include/pingcap/kv/internal/type_traits.h index 8b08771f..a21fb9aa 100644 --- a/include/pingcap/kv/internal/type_traits.h +++ b/include/pingcap/kv/internal/type_traits.h @@ -42,6 +42,17 @@ PINGCAP_DEFINE_TRAITS(kvrpcpb, TxnHeartBeat, KvTxnHeartBeat) PINGCAP_DEFINE_TRAITS(kvrpcpb, CheckSecondaryLocks, KvCheckSecondaryLocks) PINGCAP_DEFINE_TRAITS(coprocessor, , Coprocessor) PINGCAP_DEFINE_TRAITS(mpp, DispatchTask, DispatchMPPTask) +// add raw methods +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawGet, RawGet) +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawBatchGet, RawBatchGet) +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawPut, RawPut) +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawBatchPut, RawBatchPut) +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawDelete, RawDelete) +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawBatchDelete, RawBatchDelete) +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawScan, RawScan) +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawDeleteRange, RawDeleteRange) +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawGetKeyTTL, RawGetKeyTTL) +PINGCAP_DEFINE_TRAITS(kvrpcpb, RawCAS, RawCompareAndSwap) } // namespace kv diff --git a/include/pingcap/pd/Client.h b/include/pingcap/pd/Client.h index a7070ec4..42cb5b07 100644 --- a/include/pingcap/pd/Client.h +++ b/include/pingcap/pd/Client.h @@ -39,6 +39,8 @@ class Client : public IClient // only implement a weak get ts. uint64_t getTS() override; + std::string name() override {return "client";} + std::pair getRegionByKey(const std::string & key) override; //std::pair getPrevRegion(std::string key) override; diff --git a/include/pingcap/pd/CodecClient.h b/include/pingcap/pd/CodecClient.h index e630a283..0d11f804 100644 --- a/include/pingcap/pd/CodecClient.h +++ b/include/pingcap/pd/CodecClient.h @@ -14,9 +14,13 @@ struct CodecClient : public Client { CodecClient(const std::vector & addrs, const ClusterConfig & config) : Client(addrs, config) {} + std::string name() override {return "codeClient";} + std::pair getRegionByKey(const std::string & key) override { auto [region, leader] = Client::getRegionByKey(encodeBytes(key)); + if(!region.has_encryption_meta()) + return std::make_pair(region, leader); return std::make_pair(processRegionResult(region), leader); } diff --git a/include/pingcap/pd/IClient.h b/include/pingcap/pd/IClient.h index ecfa4616..4ab514a6 100644 --- a/include/pingcap/pd/IClient.h +++ b/include/pingcap/pd/IClient.h @@ -37,6 +37,8 @@ class IClient virtual uint64_t getGCSafePoint() = 0; virtual bool isMock() = 0; + + virtual std::string name() {return "base";} }; using ClientPtr = std::shared_ptr; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 72156cff..1a33d4b7 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -12,6 +12,8 @@ list(APPEND kvClient_sources kv/Scanner.cc) list(APPEND kvClient_sources pd/Client.cc) list(APPEND kvClient_sources coprocessor/Client.cc) list(APPEND kvClient_sources RedactHelpers.cc) +list(APPEND kvClient_sources kv/RawClient.cc) +list(APPEND kvClient_sources Histogram.cc) set(kvClient_INCLUDE_DIR ${kvClient_SOURCE_DIR}/include) diff --git a/src/Histogram.cc b/src/Histogram.cc new file mode 100644 index 00000000..5ddd3675 --- /dev/null +++ b/src/Histogram.cc @@ -0,0 +1,276 @@ +#include +#include +#include + +namespace pingcap { +const double Histogram::kBucketLimit[kNumBuckets] = { + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 12, + 14, + 16, + 18, + 20, + 25, + 30, + 35, + 40, + 45, + 50, + 60, + 70, + 80, + 90, + 100, + 120, + 140, + 160, + 180, + 200, + 250, + 300, + 350, + 400, + 450, + 500, + 600, + 700, + 800, + 900, + 1000, + 1200, + 1400, + 1600, + 1800, + 2000, + 2500, + 3000, + 3500, + 4000, + 4500, + 5000, + 6000, + 7000, + 8000, + 9000, + 10000, + 12000, + 14000, + 16000, + 18000, + 20000, + 25000, + 30000, + 35000, + 40000, + 45000, + 50000, + 60000, + 70000, + 80000, + 90000, + 100000, + 120000, + 140000, + 160000, + 180000, + 200000, + 250000, + 300000, + 350000, + 400000, + 450000, + 500000, + 600000, + 700000, + 800000, + 900000, + 1000000, + 1200000, + 1400000, + 1600000, + 1800000, + 2000000, + 2500000, + 3000000, + 3500000, + 4000000, + 4500000, + 5000000, + 6000000, + 7000000, + 8000000, + 9000000, + 10000000, + 12000000, + 14000000, + 16000000, + 18000000, + 20000000, + 25000000, + 30000000, + 35000000, + 40000000, + 45000000, + 50000000, + 60000000, + 70000000, + 80000000, + 90000000, + 100000000, + 120000000, + 140000000, + 160000000, + 180000000, + 200000000, + 250000000, + 300000000, + 350000000, + 400000000, + 450000000, + 500000000, + 600000000, + 700000000, + 800000000, + 900000000, + 1000000000, + 1200000000, + 1400000000, + 1600000000, + 1800000000, + 2000000000, + 2500000000.0, + 3000000000.0, + 3500000000.0, + 4000000000.0, + 4500000000.0, + 5000000000.0, + 6000000000.0, + 7000000000.0, + 8000000000.0, + 9000000000.0, + 1e200, +}; + +void Histogram::Clear() { + min_ = kBucketLimit[kNumBuckets - 1]; + max_ = 0; + num_ = 0; + sum_ = 0; + sum_squares_ = 0; + for (int i = 0; i < kNumBuckets; i++) { + buckets_[i] = 0; + } +} + +void Histogram::Add(double value) { + // Linear search is fast enough for our usage in db_bench + int b = 0; + while (b < kNumBuckets - 1 && kBucketLimit[b] <= value) { + b++; + } + buckets_[b] += 1.0; + if (min_ > value) min_ = value; + if (max_ < value) max_ = value; + num_++; + sum_ += value; + sum_squares_ += (value * value); +} + +void Histogram::Merge(const Histogram& other) { + if (other.min_ < min_) min_ = other.min_; + if (other.max_ > max_) max_ = other.max_; + num_ += other.num_; + sum_ += other.sum_; + sum_squares_ += other.sum_squares_; + for (int b = 0; b < kNumBuckets; b++) { + buckets_[b] += other.buckets_[b]; + } +} + +double Histogram::Median() const { return Percentile(50.0); } + +double Histogram::Percentile(double p) const { + double threshold = num_ * (p / 100.0); + double sum = 0; + for (int b = 0; b < kNumBuckets; b++) { + sum += buckets_[b]; + if (sum >= threshold) { + // Scale linearly within this bucket + double left_point = (b == 0) ? 0 : kBucketLimit[b - 1]; + double right_point = kBucketLimit[b]; + double left_sum = sum - buckets_[b]; + double right_sum = sum; + double pos = (threshold - left_sum) / (right_sum - left_sum); + double r = left_point + (right_point - left_point) * pos; + if (r < min_) r = min_; + if (r > max_) r = max_; + return r; + } + } + return max_; +} + +double Histogram::Minimum() const { + return min_; +} + +double Histogram::Maximum() const { + return max_; +} + +double Histogram::Count() const { + return num_; +} + +double Histogram::Average() const { + if (num_ == 0.0) return 0; + return sum_ / num_; +} + +double Histogram::StandardDeviation() const { + if (num_ == 0.0) return 0; + double variance = (sum_squares_ * num_ - sum_ * sum_) / (num_ * num_); + return sqrt(variance); +} + +std::string Histogram::ToString() const { + std::string r; + char buf[200]; + std::snprintf(buf, sizeof(buf), "Count: %.0f Average: %.4f StdDev: %.2f\n", + num_, Average(), StandardDeviation()); + r.append(buf); + std::snprintf(buf, sizeof(buf), "Min: %.4f Median: %.4f Max: %.4f\n", + (num_ == 0.0 ? 0.0 : min_), Median(), max_); + r.append(buf); + r.append("------------------------------------------------------\n"); + const double mult = 100.0 / num_; + double sum = 0; + for (int b = 0; b < kNumBuckets; b++) { + if (buckets_[b] <= 0.0) continue; + sum += buckets_[b]; + std::snprintf(buf, sizeof(buf), "[ %7.0f, %7.0f ) %7.0f %7.3f%% %7.3f%% ", + ((b == 0) ? 0.0 : kBucketLimit[b - 1]), // left + kBucketLimit[b], // right + buckets_[b], // count + mult * buckets_[b], // percentage + mult * sum); // cumulative percentage + r.append(buf); + + // Add hash marks based on percentage; 20 marks for 100%. + int marks = static_cast(20 * (buckets_[b] / num_) + 0.5); + r.append(marks, '#'); + r.push_back('\n'); + } + return r; +} + +} // namespace pigncap \ No newline at end of file diff --git a/src/coprocessor/Client.cc b/src/coprocessor/Client.cc index 95cc5e66..97603b96 100644 --- a/src/coprocessor/Client.cc +++ b/src/coprocessor/Client.cc @@ -51,6 +51,7 @@ std::vector ResponseIter::handle_task_impl(kv::Backoffer & bo, const co auto req = std::make_shared<::coprocessor::Request>(); req->set_tp(task.req->tp); req->set_start_ts(task.req->start_ts); + req->set_schema_ver(task.req->schema_version); req->set_data(task.req->data); req->set_is_cache_enabled(false); for (auto ts : min_commit_ts_pushed.get_timestamps()) diff --git a/src/kv/RawClient.cc b/src/kv/RawClient.cc new file mode 100644 index 00000000..d761e72a --- /dev/null +++ b/src/kv/RawClient.cc @@ -0,0 +1,319 @@ +#include +#include +#include +#include + +namespace pingcap { + +namespace kv { + +RawClient::RawClient(const std::vector & pd_addrs) + : for_cas(false), cf(Default) { + cluster_ptr = std::make_unique(pd_addrs, ClusterConfig()); +} + +RawClient::RawClient(const std::vector & pd_addrs, bool cas) + : for_cas(cas), cf(Default) { + cluster_ptr = std::make_unique(pd_addrs, ClusterConfig()); +} + +RawClient::RawClient(const std::vector & pd_addrs, const ClusterConfig & config) + : for_cas(false), cf(Default) { + cluster_ptr = std::make_unique(pd_addrs, config); +} + +RawClient::RawClient(const std::vector & pd_addrs, const ClusterConfig & config, bool cas) + : for_cas(cas), cf(Default) { + cluster_ptr = std::make_unique(pd_addrs, config); +} + +bool RawClient::IsCASClient() { + return for_cas; +} + +RawClient& RawClient::AsCASClient() { + for_cas = true; + return *this; +} + +RawClient& RawClient::AsRawClient() { + for_cas = false; + return *this; +} + +void RawClient::SetColumnFamily(ColumnFamily cof) { + cf = cof; +} + +std::string RawClient::GetColumnFamily() { + return std::string(kCfString[cf]); +} + +void RawClient::Put(const std::string &key, const std::string &value) { + Backoffer bo(RawPutMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawPutRequest()); + req->set_key(key); + req->set_value(value); + req->set_for_cas(for_cas); + auto resp = client.sendReqToRegion(bo, req); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } +} + +void RawClient::Put(const std::string &key, const std::string &value, uint64_t ttl) { + Backoffer bo(RawPutMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawPutRequest()); + req->set_key(key); + req->set_value(value); + req->set_ttl(ttl); + req->set_for_cas(for_cas); + auto resp = client.sendReqToRegion(bo, req); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } +} + + +void RawClient::Put(const std::string &key, const std::string &value, int64_t to_ms) { + Backoffer bo(RawPutMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawPutRequest()); + req->set_key(key); + req->set_value(value); + req->set_for_cas(for_cas); + auto resp = client.sendReqToRegion(bo, req, to_ms); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } +} + +void RawClient::Put(const std::string &key, const std::string &value, int64_t to_ms, uint64_t ttl) { + Backoffer bo(RawPutMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawPutRequest()); + req->set_key(key); + req->set_value(value); + req->set_ttl(ttl); + req->set_for_cas(for_cas); + auto resp = client.sendReqToRegion(bo, req, to_ms); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } +} + +void RawClient::Delete(const std::string &key) { + Backoffer bo(RawDeleteMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawDeleteRequest()); + req->set_key(key); + req->set_for_cas(for_cas); + auto resp = client.sendReqToRegion(bo, req); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } +} + +void RawClient::Delete(const std::string &key, int64_t to_ms) { + Backoffer bo(RawDeleteMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawDeleteRequest()); + req->set_key(key); + req->set_for_cas(for_cas); + auto resp = client.sendReqToRegion(bo, req, to_ms); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } +} + +uint64_t RawClient::GetKeyTTL(const std::string &key) { + Backoffer bo(RawGetMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawGetKeyTTLRequest()); + req->set_key(key); + auto resp = client.sendReqToRegion(bo, req); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } + return resp->ttl(); +} + +uint64_t RawClient::GetKeyTTL(const std::string &key, int64_t to_ms) { + Backoffer bo(RawGetMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawGetKeyTTLRequest()); + req->set_key(key); + auto resp = client.sendReqToRegion(bo, req, to_ms); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } + return resp->ttl(); +} + +std::optional RawClient::Get(const std::string &key) { + Backoffer bo(RawGetMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawGetRequest()); + req->set_key(key); + + auto resp = client.sendReqToRegion(bo, req); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } + if(resp->not_found()) { + return std::nullopt; + } + return resp->value(); +} + +std::optional RawClient::Get(const std::string &key, int64_t to_ms) { + Backoffer bo(RawGetMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawGetRequest()); + req->set_key(key); + auto resp = client.sendReqToRegion(bo, req, to_ms); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } + if(resp->not_found()) { + return std::nullopt; + } + return resp->value(); +} + +std::optional RawClient::CompareAndSwap(const std::string &key, std::optional old_value, + const std::string &new_value, bool &is_swap) { + Backoffer bo(RawPutMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawCASRequest()); + req->set_key(key); + req->set_value(new_value); + if(old_value.has_value()) { + req->set_previous_not_exist(false); + req->set_previous_value(old_value.value()); + } else { + req->set_previous_not_exist(true); + req->set_previous_value(""); + } + auto resp = client.sendReqToRegion(bo, req); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } + if(resp->previous_not_exist()) { + is_swap = false; + return std::nullopt; + } + is_swap = true; + return resp->previous_value(); +} + +std::optional RawClient::CompareAndSwap(const std::string &key, std::optional old_value, + const std::string &new_value, bool &is_swap, int64_t to_ms) { + Backoffer bo(RawPutMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawCASRequest()); + req->set_key(key); + req->set_value(new_value); + if(old_value.has_value()) { + req->set_previous_not_exist(false); + req->set_previous_value(old_value.value()); + } else { + req->set_previous_not_exist(true); + req->set_previous_value(""); + } + auto resp = client.sendReqToRegion(bo, req, to_ms); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } + if(resp->previous_not_exist()) { + is_swap = false; + return std::nullopt; + } + is_swap = true; + return resp->previous_value(); +} + +std::optional RawClient::CompareAndSwap(const std::string &key, std::optional old_value, + const std::string &new_value, bool &is_swap, int64_t to_ms, uint64_t ttl) { + Backoffer bo(RawPutMaxBackoff); + auto local = cluster_ptr->region_cache->locateKey(bo, key); + RegionClient client(cluster_ptr.get(), local.region); + auto req = std::shared_ptr(new kvrpcpb::RawCASRequest()); + req->set_key(key); + req->set_value(new_value); + req->set_ttl(ttl); + if(old_value.has_value()) { + req->set_previous_not_exist(false); + req->set_previous_value(old_value.value()); + } else { + req->set_previous_not_exist(true); + req->set_previous_value(""); + } + auto resp = client.sendReqToRegion(bo, req, to_ms); + if(resp->has_region_error()) { + throw Exception(resp->region_error().message(), RegionUnavailable); + } + if(resp->error() != "") { + throw Exception("unexpected error: " + resp->error(), ErrorCodes::UnknownError); + } + if(resp->previous_not_exist()) { + is_swap = false; + return std::nullopt; + } + is_swap = true; + return resp->previous_value(); +} + +}//namespace kv +}//namespace pincap + diff --git a/src/kv/RegionCache.cc b/src/kv/RegionCache.cc index b85f0ea1..4055aec9 100644 --- a/src/kv/RegionCache.cc +++ b/src/kv/RegionCache.cc @@ -82,14 +82,15 @@ RegionPtr RegionCache::getRegionByID(Backoffer & bo, const RegionVerID & id) KeyLocation RegionCache::locateKey(Backoffer & bo, const std::string & key) { - RegionPtr region = searchCachedRegion(key); + RegionPtr region = searchCachedRegion(key); /*key not encode*/ + if (region != nullptr) { return KeyLocation(region->verID(), region->startKey(), region->endKey()); } region = loadRegionByKey(bo, key); - + log->information("add region: " + region->verID().toString() +", start key: " + region->startKey() + ", end key: " + region->endKey()); insertRegionToCache(region); return KeyLocation(region->verID(), region->startKey(), region->endKey()); @@ -149,6 +150,7 @@ RegionPtr RegionCache::loadRegionByKey(Backoffer & bo, const std::string & key) { try { + log->information("pd client name: " + pd_client->name()); auto [meta, leader] = pd_client->getRegionByKey(key); if (!meta.IsInitialized()) { @@ -167,6 +169,7 @@ RegionPtr RegionCache::loadRegionByKey(Backoffer & bo, const std::string & key) } catch (const Exception & e) { + log->information( "exception: " + e.displayText()); bo.backoff(boPDRPC, e); } } @@ -304,9 +307,12 @@ void RegionCache::onRegionStale(Backoffer & bo, RPCContextPtr ctx, const errorpb for (int i = 0; i < stale_epoch.current_regions_size(); i++) { auto meta = stale_epoch.current_regions(i); - if (auto * pd = static_cast(pd_client.get())) - { - pd->processRegionResult(meta); + if(meta.has_encryption_meta()) { + log->information("keys encryption"); + if (auto * pd = static_cast(pd_client.get())) + { + pd->processRegionResult(meta); + } } RegionPtr region = std::make_shared(meta, meta.peers(0)); region->switchPeer(ctx->peer.store_id()); diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 9feeb8e5..d96dc664 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -18,3 +18,4 @@ add_test(kv_client_test kv_client_ut) add_subdirectory(bank_test) add_subdirectory(real_tikv_test) +add_subdirectory(raw_client_test) diff --git a/src/test/raw_client_test/CMakeLists.txt b/src/test/raw_client_test/CMakeLists.txt new file mode 100644 index 00000000..27a9f05e --- /dev/null +++ b/src/test/raw_client_test/CMakeLists.txt @@ -0,0 +1,15 @@ +add_executable(raw_client test_raw_client.cc) +target_include_directories(raw_client PUBLIC ${test_includes}) +target_link_libraries(raw_client ${test_libs}) + +add_executable(stress stress.cc) +target_include_directories(stress PUBLIC ${test_includes}) +target_link_libraries(stress ${test_libs}) + +add_executable(stress_v2 stress_v2.cc) +target_include_directories(stress_v2 PUBLIC ${test_includes}) +target_link_libraries(stress_v2 ${test_libs}) + +add_executable(volume_stress volume_stress.cc) +target_include_directories(volume_stress PUBLIC ${test_includes}) +target_link_libraries(volume_stress ${test_libs}) \ No newline at end of file diff --git a/src/test/raw_client_test/stress.cc b/src/test/raw_client_test/stress.cc new file mode 100644 index 00000000..6f24c49e --- /dev/null +++ b/src/test/raw_client_test/stress.cc @@ -0,0 +1,202 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace pingcap; +using namespace pingcap::kv; + +std::atomic fail_cnt; + +class TimerCounter { + struct timeval start_, end_; +public: + void Start() { gettimeofday(&start_, NULL); } + void Stop() { gettimeofday(&end_, NULL); } + void PrintTime(int64_t base) { + std::cout << "Queries: " + << base << " Runtime: " + << ((end_.tv_sec - start_.tv_sec) + (end_.tv_usec - start_.tv_usec)/1000000.0) << "s, QPS: " + << (base * 1000) / ((end_.tv_sec - start_.tv_sec) *1000 + (end_.tv_usec - start_.tv_usec)/1000) + << std::endl; + } +}; + +void multithread_write_to_db( + const std::vector &ip_addr, size_t start, size_t end) { + Histogram his; + his.Clear(); + struct timeval s, e; + std::shared_ptr client = std::shared_ptr(new RawClient(ip_addr)); + for (size_t i = start; i < end; i++) { + gettimeofday(&s, NULL); + try { + client->Put(std::to_string(i), std::string(20480, 'a')); + } catch(...) { + std::cout << "put data exception" << std::endl; + } + gettimeofday(&e, NULL); + his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); + } + std::cout << "Latency (us):" + << " Min: " << his.Minimum() + << " Avg: " << his.Average() + << " P99: " << his.Percentile(99.0) + << " Max: " << his.Maximum() + << " StdDev: " << his.StandardDeviation() + << " Queries: " << his.Count() + << std::endl; +} + +void multithread_read_db( + const std::vector &ip_addr, size_t start, size_t end) { + Histogram his; + his.Clear(); + struct timeval s, e; + std::optional ret; + std::shared_ptr client = std::shared_ptr(new RawClient(ip_addr)); + for (size_t i = start; i < end; i++) { + gettimeofday(&s, NULL); + ret = client->Get(std::to_string(i)); + gettimeofday(&e, NULL); + if(!ret.has_value()) { + fail_cnt.fetch_add(1, std::memory_order_relaxed); + } + his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); + } + std::cout << "Latency (us):" + << " Min: " << his.Minimum() + << " Avg: " << his.Average() + << " P99: " << his.Percentile(99.0) + << " Max: " << his.Maximum() + << " StdDev: " << his.StandardDeviation() + << " Queries: " << his.Count() + << std::endl; +} + +void multithread_cas_db( + const std::vector &ip_addr, size_t start, size_t end) { + Histogram his; + his.Clear(); + struct timeval s, e; + auto clit = new RawClient(ip_addr); + clit->AsCASClient(); + std::shared_ptr client = std::shared_ptr(clit); + for (size_t i = start; i < end; i++) { + gettimeofday(&s, NULL); + bool is_swap; + client->CompareAndSwap(std::to_string(i), std::string(20480, 'a'), "test_new_value", is_swap); + gettimeofday(&e, NULL); + if(!is_swap) { + fail_cnt.fetch_add(1, std::memory_order_relaxed); + } + his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); + } + std::cout << "Latency (us):" + << " Min: " << his.Minimum() + << " Avg: " << his.Average() + << " P99: " << his.Percentile(99.0) + << " Max: " << his.Maximum() + << " StdDev: " << his.StandardDeviation() + << " Queries: " << his.Count() + << std::endl; +} + +void random_valid_to_db(const std::vector &ip_addr, size_t start, size_t end) { + std::shared_ptr client = std::shared_ptr(new RawClient(ip_addr)); + for (size_t i = 0; i < 100; i++) { + auto ret = client->Get(std::to_string(i)); + std::cout << "valid key: " << i << " ,value: " << ret.value_or("NOT FOUND") << std::endl; + } +} + +bool multi_assign_jobs(std::vector &ip_addr, size_t jobs, size_t workers, std::string rw) { + size_t per_job = (jobs + workers - 1) / workers; + std::vector pool; + pool.reserve(workers); + + if(rw == "w") { + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_write_to_db, std::ref(ip_addr), + w * per_job, (std::min((w + 1) * per_job, jobs))); + } + } else if(rw == "r") { + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_read_db, std::ref(ip_addr), + w * per_job, (std::min((w + 1) * per_job, jobs))); + } + } else if(rw == "cas") { + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_cas_db, std::ref(ip_addr), + w * per_job, (std::min((w + 1) * per_job, jobs))); + } + } else { + random_valid_to_db(std::ref(ip_addr), 0, jobs); + return true; + } + + for (size_t i = 0; i < workers; i++) { + pool[i].join(); + } + return true; +} + + +#define BATCH 1000 + +int main(int argc, char *argv[]) { + if (argc != 5) { + std::cout << "usage: ./exec $ip:port $concurrent $batch $rw"<< std::endl; + exit(EXIT_SUCCESS); + } + + std::cout << std::endl; + std::cout << "**********config*********" << std::endl; + std::cout << std::endl; + std::string ip_add = std::string(argv[1]); + std::cout << "ip address: " << ip_add << std::endl; + std::cout << "concurrent number: " << argv[2] << std::endl; + std::cout << "batch number: " << argv[3] << std::endl; + int concurrent = std::atoi(argv[2]); + uint64_t batch = std::atol(argv[3]); + std::string rw = argv[4]; + if(rw != "r" && rw != "w" && rw != "cas" && rw != "v") { + std::cout << "input rw error" << std::endl; + return 0; + } + std::cout << std::endl; + std::cout << std::endl; + int cpu_num; + cpu_num = concurrent ? concurrent: sysconf(_SC_NPROCESSORS_CONF); + batch = batch? batch: BATCH; + ip_add = ip_add.empty()? "127.0.0.1:2379": ip_add; + std::vector pd_addrs{ip_add}; + + // std::shared_ptr client; + // if(rw != "cas") + // client = std::shared_ptr(new RawClient(pd_addrs)); + // else { + // auto clit = new RawClient(pd_addrs); + // clit->AsCASClient(); + // client = std::shared_ptr(clit); + // } + + TimerCounter tc; + tc.Start(); + multi_assign_jobs(pd_addrs, batch, cpu_num, rw); + tc.Stop(); + std::cout << "failed: " << fail_cnt << std::endl; + tc.PrintTime(batch); + return 0; +} \ No newline at end of file diff --git a/src/test/raw_client_test/stress_v2.cc b/src/test/raw_client_test/stress_v2.cc new file mode 100644 index 00000000..3b426c1e --- /dev/null +++ b/src/test/raw_client_test/stress_v2.cc @@ -0,0 +1,255 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "pingcap/Log.h" +#include + +using namespace pingcap; +using namespace pingcap::kv; + +#define LARGE_VALUE +std::atomic fail_cnt; + +class TimerCounter { + struct timeval start_, end_; +public: + void Start() { gettimeofday(&start_, NULL); } + void Stop() { gettimeofday(&end_, NULL); } + void PrintTime(int64_t base) { + std::cout << "Queries: " + << base << " Runtime: " + << ((end_.tv_sec - start_.tv_sec) + (end_.tv_usec - start_.tv_usec)/1000000.0) << "s, QPS: " + << (base * 1000) / ((end_.tv_sec - start_.tv_sec) *1000 + (end_.tv_usec - start_.tv_usec)/1000) + << std::endl; + } +}; + +void multithread_write_to_db( + std::shared_ptr client, size_t start, size_t end) { + // Histogram his; + // his.Clear(); + // struct timeval s, e; + for (size_t i = start; i < end; i++) { + // gettimeofday(&s, NULL); + if(i % 100 == 0) std::cout << "key: " << i << std::endl; +#ifdef LARGE_VALUE + for(int ty = 0; ty < 5; ty ++) { + try{ + client->Put(std::to_string(i), std::string(20480, 'a')); + } catch(...) { + std::cout << "put key error: " << i << ", retry: " << ty << std::endl; + continue; + } + break; + } +#else + client->Put(std::to_string(i), std::string(10, 'a')); +#endif + // gettimeofday(&e, NULL); + // his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); + } + // std::cout << "Latency (us):" + // << " Min: " << his.Minimum() + // << " Avg: " << his.Average() + // << " P99: " << his.Percentile(99.0) + // << " Max: " << his.Maximum() + // << " StdDev: " << his.StandardDeviation() + // << " Queries: " << his.Count() + // << std::endl; +} + +void multithread_read_db( + std::shared_ptr client, size_t start, size_t end) { + // Histogram his; + // his.Clear(); + // struct timeval s, e; + for (size_t i = start; i < end; i++) { + if(i % 100 == 0) std::cout << "key: " << i << std::endl; + // gettimeofday(&s, NULL); + std::optional ret; + for(int ty = 0; ty < 5; ty ++) { + try { + ret = client->Get(std::to_string(i)); + } + catch(const Exception &exc) { + std::cerr << "get key: " << i << ",error, and try re-get, because: " << exc.displayText() << '\n'; + continue; + } + break; + } + // gettimeofday(&e, NULL); +#ifdef LARGE_VALUE + assert(ret.has_value() && (ret.value().size() == 20480)); +#else + if(ret.value_or("").size() < 10) { + std::cout << "get value error"<< std::endl; + } +#endif + if(!ret.has_value()) { + fail_cnt.fetch_add(1, std::memory_order_relaxed); + } + // his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); + } + // std::cout << "Latency (us):" + // << " Min: " << his.Minimum() + // << " Avg: " << his.Average() + // << " P99: " << his.Percentile(99.0) + // << " Max: " << his.Maximum() + // << " StdDev: " << his.StandardDeviation() + // << " Queries: " << his.Count() + // << std::endl; +} + +void multithread_cas_db( + std::shared_ptr client, size_t start, size_t end) { + // Histogram his; + // his.Clear(); + // struct timeval s, e; + for (size_t i = start; i < end; i++) { + // gettimeofday(&s, NULL); + bool is_swap; +#ifdef LARGE_VALUE + for(int ty = 0; ty < 5; ty ++) { + try{ + client->CompareAndSwap(std::to_string(i), std::string(20480, 'a'), "test_new_value", is_swap); + } catch(...) { + std::cout << "cas key error: " << i << ", retry: " << ty << std::endl; + continue; + } + break; + } +#else + client->CompareAndSwap(std::to_string(i), std::string(10, 'a'), "test_new_value", is_swap); +#endif + // gettimeofday(&e, NULL); + if(!is_swap) { + fail_cnt.fetch_add(1, std::memory_order_relaxed); + } + // his.Add((e.tv_sec-s.tv_sec)*1000000 + (e.tv_usec - s.tv_usec)); + } + // std::cout << "Latency (us):" + // << " Min: " << his.Minimum() + // << " Avg: " << his.Average() + // << " P99: " << his.Percentile(99.0) + // << " Max: " << his.Maximum() + // << " StdDev: " << his.StandardDeviation() + // << " Queries: " << his.Count() + // << std::endl; +} + +void random_valid_to_db(std::shared_ptr client, size_t start, size_t end) { + size_t cnt = 0; + for (size_t i = 0; i < 10000000; i++) { + for(int k = 0; k < 5; k++) { + std::optional ret; + try { + ret = client->Get(std::to_string(i)); + } + catch(const Exception &exc) { + std::cerr << "get key: " << i << "retry: " << k << '\n'; + continue; + } + if(ret.has_value() && ret.value().size() == 14) { + cnt ++; + } + break; + } + } + std::cout << "key-value number" << cnt << std::endl; +} + +bool multi_assign_jobs(std::shared_ptr client, size_t jobs, size_t workers, std::string rw) { + size_t per_job = (jobs + workers - 1) / workers; + std::vector pool; + pool.reserve(workers); + + if(rw == "w") { + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_write_to_db, client, + w * per_job, (std::min((w + 1) * per_job, jobs))); + } + } else if(rw == "r") { + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_read_db, client, + w * per_job, (std::min((w + 1) * per_job, jobs))); + } + } else if(rw == "cas") { + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_cas_db, client, + w * per_job, (std::min((w + 1) * per_job, jobs))); + } + } else { + random_valid_to_db(client, 0, jobs); + return true; + } + + for (size_t i = 0; i < workers; i++) { + pool[i].join(); + } + return true; +} + + +#define BATCH 1000 + +int main(int argc, char *argv[]) { + if (argc != 5) { + std::cout << "usage: ./exec $ip:port $concurrent $batch $rw"<< std::endl; + exit(EXIT_SUCCESS); + } + + std::cout << std::endl; + std::cout << "**********config*********" << std::endl; + std::cout << std::endl; + std::string ip_add = std::string(argv[1]); + std::cout << "ip address: " << ip_add << std::endl; + std::cout << "concurrent number: " << argv[2] << std::endl; + std::cout << "batch number: " << argv[3] << std::endl; + int concurrent = std::atoi(argv[2]); + uint64_t batch = std::atol(argv[3]); + std::string rw = argv[4]; + if(rw != "r" && rw != "w" && rw != "cas" && rw != "v") { + std::cout << "input rw error" << std::endl; + return 0; + } + std::cout << std::endl; + std::cout << std::endl; + int cpu_num; + cpu_num = concurrent ? concurrent: sysconf(_SC_NPROCESSORS_CONF); + batch = batch? batch: BATCH; + ip_add = ip_add.empty()? "127.0.0.1:2379": ip_add; + std::vector pd_addrs{ip_add}; + // Poco::AutoPtr console_channel(new Poco::ConsoleChannel); + // pingcap::Logger::get("pingcap.tikv").setChannel(console_channel); + + std::shared_ptr client; + if(rw != "cas") + client = std::shared_ptr(new RawClient(pd_addrs)); + else { + auto clit = new RawClient(pd_addrs); + clit->AsCASClient(); + client = std::shared_ptr(clit); + } + + // TimerCounter tc; + // tc.Start(); + multi_assign_jobs(client, batch, cpu_num, rw); + // tc.Stop(); + std::cout << "failed: " << fail_cnt << std::endl; + // tc.PrintTime(batch); + return 0; +} \ No newline at end of file diff --git a/src/test/raw_client_test/test_raw_client.cc b/src/test/raw_client_test/test_raw_client.cc new file mode 100644 index 00000000..83fa7cb9 --- /dev/null +++ b/src/test/raw_client_test/test_raw_client.cc @@ -0,0 +1,94 @@ +#include +#include +#include +#include + +class TimerCounter { + std::chrono::time_point start_; + std::chrono::time_point end_; + +public: + void Start() { start_ = std::chrono::high_resolution_clock::now(); } + void Stop() { end_ = std::chrono::high_resolution_clock::now(); } + void PrintTime(const std::string &msg, int64_t base) { + std::cout + << msg << " Run time " + << base * 1000 / std::chrono::duration(end_ - start_).count() + << "QPS" << std::endl; + } +}; + +using namespace pingcap; +using namespace pingcap::kv; + +void TestPutAndGet(std::shared_ptr client, const int cnt) { + for(int i = 0; i < cnt; i++) { + client->Put("key" + std::to_string(i), "value" + std::to_string(i)); + } + // for(int i = start; i < start + 10; i++) { + // auto value = client->Get("key" + std::to_string(i)); + // std::cout << "value is : " << value.value_or("null") << std::endl; + // } +} + +void TestValidGet(std::shared_ptr client, const int cnt) { + for(int i = 0; i < cnt + 10; i++) { + auto value = client->Get("key" + std::to_string(i)); + std::cout << "value is : " << value.value_or("null") << std::endl; + } +} + +// if with ttl rocksdb should open ttl function +void TestPutAndGetWithTTL(std::shared_ptr client, const int start, uint64_t ms) { + for(int i = start; i < start + 10; i++) { + client->Put("key" + std::to_string(i), "value" + std::to_string(i), ms); + } + + for(int i = start; i < start + 10; i++) { + auto value = client->Get("key" + std::to_string(i)); + std::cout << "value is : " << value.value_or("null") << std::endl; + } + + for(int i = start; i < start + 10; i++) { + auto value = client->GetKeyTTL("key" + std::to_string(i)); + std::cout << "value TTL is : " << value << std::endl; + } +} + +void TestDeleteValues(std::shared_ptr client, const int start) { + for(int i = start; i < start + 10; i++) { + client->Delete("key" + std::to_string(i)); + } + + for(int i = start; i < start + 10; i++) { + auto value = client->Get("key" + std::to_string(i)); + std::cout << "value is : " << value.value_or("deleted") << std::endl; + } +} + +void TestCompareAndSwap(std::shared_ptr client, const int start) { + for(int i = start; i < start + 10; i++) { + bool s; + auto v = client->CompareAndSwap("key" + std::to_string(i), "value" + std::to_string(i), + "value" + std::to_string(i + 10), s); + std::cout << "old value: " << v.value_or("null") << std::endl; + } + + for(int i = start; i < start + 10; i++) { + auto value = client->Get("key" + std::to_string(i)); + std::cout << "value is : " << value.value_or("deleted") << std::endl; + } +} + + +int main() { + std::vector pd_addrs{"127.0.0.1:2379"}; + std::shared_ptr client = std::shared_ptr(new RawClient(pd_addrs)); + TimerCounter tc; + tc.Start(); + TestPutAndGet(client, 10000); + tc.Stop(); + tc.PrintTime("run time", 10000); + // with TTL + return 0; +} \ No newline at end of file diff --git a/src/test/raw_client_test/volume_stress.cc b/src/test/raw_client_test/volume_stress.cc new file mode 100644 index 00000000..404b85b3 --- /dev/null +++ b/src/test/raw_client_test/volume_stress.cc @@ -0,0 +1,215 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "pingcap/Log.h" +#include +#include +#include + +using namespace pingcap; +using namespace pingcap::kv; + +std::atomic fail_cnt; +std::mutex plk; + +void multithread_write_to_db( + std::shared_ptr client, size_t start, size_t end) { + int64_t fail_sta = 0; + + for (size_t i = start; i < end; i++) { + int ty = 0; + for(; ty < 5; ty ++) { + try{ + client->Put(std::to_string(i), std::string(20480, 'a')); + } catch(...) { + continue; + } + break; + } // end try loop + if(ty >= 5) { + fail_sta ++; + } + } // end for loop + + { + std::lock_guard lk(plk); + std::cout << "| tid | number_keys | fail number |" << std::endl; + std::cout << "| " << std::this_thread::get_id() << " | " << end-start << " | " << fail_sta << " |" << std::endl; + std::cout << "|-------|---------------|---------------|" << std::endl; + } + +} + +void multithread_read_db( + std::shared_ptr client, size_t start, size_t end) { + int64_t fail_read = 0; + + for (size_t i = start; i < end; i++) { + std::optional ret; + int ty = 0; + for(; ty < 5; ty ++) { + try { + ret = client->Get(std::to_string(i)); + } + catch(const Exception &exc) { + continue; + } + break; + } // end try loop + assert(ret.has_value() && (ret.value().size() == 20480)); + if(!ret.has_value() || ty >=5) { + fail_read ++; + fail_cnt.fetch_add(1, std::memory_order_relaxed); + } + } // end for loop + + { + std::lock_guard lk(plk); + std::cout << "| tid | number_keys | fail number |" << std::endl; + std::cout << "| " << std::this_thread::get_id() << " | " << end-start << " | " << fail_read << " |" << std::endl; + std::cout << "|-------|---------------|---------------|" << std::endl; + } + +} + +void multithread_cas_db( + std::shared_ptr client, size_t start, size_t end) { + int64_t fail_cas = 0; + + for (size_t i = start; i < end; i++) { + bool is_swap; + int ty = 0; + for(; ty < 5; ty ++) { + try{ + client->CompareAndSwap(std::to_string(i), std::string(20480, 'a'), std::string(20480, 'b'), is_swap); + } catch(...) { + continue; + } + break; + } // end try loop + if(!is_swap) { + fail_cas ++; + fail_cnt.fetch_add(1, std::memory_order_relaxed); + } + }// end for loop + + { + std::lock_guard lk(plk); + std::cout << "| tid | number_keys | fail number |" << std::endl; + std::cout << "| " << std::this_thread::get_id() << " | " << end-start << " | " << fail_cas << " |" << std::endl; + std::cout << "|-------|---------------|---------------|" << std::endl; + + } +} + +void random_valid_to_db(std::shared_ptr client, size_t start, size_t end) { + size_t cnt = 0; + for (size_t i = 0; i < 100; i++) { + for(int k = 0; k < 5; k++) { + std::optional ret; + try { + ret = client->Get(std::to_string(i)); + } + catch(const Exception &exc) { + continue; + } + if(ret.has_value() && ret.value().size() == 14) { + cnt ++; + } + std::cout << "get key: " << ret.value() << std::endl; + break; + } + } +} + +bool multi_assign_jobs(std::shared_ptr client, size_t jobs, size_t workers, std::string rw) { + size_t per_job = (jobs + workers - 1) / workers; + std::vector pool; + pool.reserve(workers); + + if(rw == "w") { + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_write_to_db, client, + w * per_job, (std::min((w + 1) * per_job, jobs))); + } + } else if(rw == "r") { + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_read_db, client, + w * per_job, (std::min((w + 1) * per_job, jobs))); + } + } else if(rw == "cas") { + for (size_t w = 0; w < workers; w++) { + pool.emplace_back(multithread_cas_db, client, + w * per_job, (std::min((w + 1) * per_job, jobs))); + } + } else { + random_valid_to_db(client, 0, jobs); + return true; + } + + for (size_t i = 0; i < workers; i++) { + pool[i].join(); + } + return true; +} + + +#define BATCH 1000 + +int main(int argc, char *argv[]) { + if (argc != 5) { + std::cout << "usage: ./exec $ip:port $concurrent $batch $rw"<< std::endl; + exit(EXIT_SUCCESS); + } + + std::cout << std::endl; + std::cout << "**********config*********" << std::endl; + std::cout << std::endl; + std::string ip_add = std::string(argv[1]); + std::cout << "ip address: " << ip_add << std::endl; + std::cout << "concurrent number: " << argv[2] << std::endl; + std::cout << "batch number: " << argv[3] << std::endl; + int concurrent = std::atoi(argv[2]); + uint64_t batch = std::atol(argv[3]); + std::string rw = argv[4]; + if(rw != "r" && rw != "w" && rw != "cas" && rw != "v") { + std::cout << "input rw error" << std::endl; + return 0; + } + std::cout << std::endl; + std::cout << std::endl; + int cpu_num; + cpu_num = concurrent ? concurrent: sysconf(_SC_NPROCESSORS_CONF); + batch = batch? batch: BATCH; + ip_add = ip_add.empty()? "127.0.0.1:2379": ip_add; + std::vector pd_addrs{ip_add}; + // Poco::AutoPtr console_channel(new Poco::ConsoleChannel); + // pingcap::Logger::get("pingcap.tikv").setChannel(console_channel); + + std::shared_ptr client; + if(rw != "cas") + client = std::shared_ptr(new RawClient(pd_addrs)); + else { + auto clit = new RawClient(pd_addrs); + clit->AsCASClient(); + client = std::shared_ptr(clit); + } + + multi_assign_jobs(client, batch, cpu_num, rw); + std::cout << "failed: " << fail_cnt << std::endl; + return 0; +} \ No newline at end of file diff --git a/third_party/googletest b/third_party/googletest index e08a4602..a1cc8c55 160000 --- a/third_party/googletest +++ b/third_party/googletest @@ -1 +1 @@ -Subproject commit e08a4602778b3cbea36dbd53724db0f18840e274 +Subproject commit a1cc8c55195661a58ad60c3bb062a0b9c302710d diff --git a/third_party/kvproto b/third_party/kvproto index fc36d786..f7a7c8cc 160000 --- a/third_party/kvproto +++ b/third_party/kvproto @@ -1 +1 @@ -Subproject commit fc36d7869035ffd96810efdb9c1f053c6081a773 +Subproject commit f7a7c8ccda74ae17acdde7b47cf829ac6f5c2a52 diff --git a/third_party/libfiu b/third_party/libfiu index 20ea5e85..4906c58c 160000 --- a/third_party/libfiu +++ b/third_party/libfiu @@ -1 +1 @@ -Subproject commit 20ea5e85ec63e3dedd6904e103fb2e56b46840cf +Subproject commit 4906c58ccdbbac4ac1d867ab1e3ee606993139b2