Skip to content

Commit

Permalink
Merge branch 'OpenAtomFoundation:unstable' into unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
cheniujh authored Aug 12, 2024
2 parents 0e78b0c + 6991190 commit 95e4c48
Show file tree
Hide file tree
Showing 23 changed files with 512 additions and 89 deletions.
19 changes: 17 additions & 2 deletions .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ jobs:
# Build your program with the given configuration
run: cmake --build build --config ${{ env.BUILD_TYPE }}

- name: Cleanup
run: |
rm -rf ./deps
rm -rf ./buildtrees
- uses: actions/upload-artifact@v3
with:
name: ${{ env.ARTIFACT_PIKA_NAME }}
Expand All @@ -54,7 +59,7 @@ jobs:
working-directory: ${{ github.workspace }}/build
# Execute tests defined by the CMake configuration.
# See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail
run: ctest -C ${{ env.BUILD_TYPE }}
run: ctest -C ${{ env.BUILD_TYPE }} --verbose

- name: Unit Test
working-directory: ${{ github.workspace }}
Expand Down Expand Up @@ -119,6 +124,11 @@ jobs:
source /opt/rh/gcc-toolset-13/enable
cmake --build build --config ${{ env.BUILD_TYPE }}
- name: Cleanup
run: |
rm -rf ./deps
rm -rf ./buildtrees
- name: Test
working-directory: ${{ github.workspace }}/build
run: ctest -C ${{ env.BUILD_TYPE }}
Expand Down Expand Up @@ -175,9 +185,14 @@ jobs:
run: |
cmake --build build --config ${{ env.BUILD_TYPE }}
- name: Cleanup
run: |
rm -rf ./deps
rm -rf ./buildtrees
- name: Test
working-directory: ${{ github.workspace }}/build
run: ctest --rerun-failed --output-on-failure -C ${{ env.BUILD_TYPE }}
run: ctest -C ${{ env.BUILD_TYPE }} --verbose

- name: Unit Test
working-directory: ${{ github.workspace }}
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Pika can be deployed in a single-machine master-slave mode (slaveof) or in a [Co

## Pika Storage Engine Architecture

* Supports multiple platforms: CentOS, Ubuntu, macOS
* Supports multiple platforms: CentOS, Ubuntu, macOS, Rocky Linux
* Multi-threaded model
* Based on the RocksDB storage engine
* Multiple granularity data caching model
Expand Down
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Pika 力求在完全兼容 Redis 协议、 继承 Redis 便捷运维设计的前

## Pika架构之存储引擎

* 支持多平台 CentOS、Ubuntu、macOS
* 支持多平台 CentOS、Ubuntu、macOS、Rocky Linux
* 多线程模型
* 基于 RocksDB 的存储引擎
* 多粒度数据缓存模型
Expand Down
11 changes: 11 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ sync-binlog-thread-num : 1
# is used for replication.
log-path : ./log/

# log retention time of serverlogs(pika.{hostname}.{username}.log.{loglevel}.YYYYMMDD-HHMMSS) files that stored within log-path.
# Any serverlogs files that exceed this time will be cleaned up.
# The unit of serverlogs is in [days] and the default value is 7(days).
log-retention-time : 7

# Directory to store the data of Pika.
db-path : ./db/

Expand Down Expand Up @@ -351,6 +356,12 @@ level0-slowdown-writes-trigger : 20
# rocksdb level0_file_num_compaction_trigger
level0-file-num-compaction-trigger : 4

# enable db statistics [yes | no] default no
enable-db-statistics : no
# see rocksdb/include/rocksdb/statistics.h enum StatsLevel for more details
# only use ticker counter should set db-statistics-level to 2
db-statistics-level : 2

# The maximum size of the response package to client to prevent memory
# exhaustion caused by commands like 'keys *' and 'Scan' which can generate huge response.
# Supported Units [K|M|G]. The default unit is in [bytes].
Expand Down
14 changes: 14 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return log_path_;
}
int log_retention_time() {
std::shared_lock l(rwlock_);
return log_retention_time_;
}
std::string log_level() {
std::shared_lock l(rwlock_);
return log_level_;
Expand Down Expand Up @@ -174,6 +178,13 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return max_total_wal_size_;
}
bool enable_db_statistics() {
return enable_db_statistics_;
}
int db_statistics_level() {
std::shared_lock l(rwlock_);
return db_statistics_level_;
}
int64_t max_client_response_size() {
std::shared_lock l(rwlock_);
return max_client_response_size_;
Expand Down Expand Up @@ -905,6 +916,7 @@ class PikaConf : public pstd::BaseConf {
int db_sync_speed_ = 0;
std::string slaveof_;
std::string log_path_;
int log_retention_time_;
std::string log_level_;
std::string db_path_;
int db_instance_num_ = 0;
Expand All @@ -924,6 +936,8 @@ class PikaConf : public pstd::BaseConf {
int64_t thread_migrate_keys_num_ = 0;
int64_t max_write_buffer_size_ = 0;
int64_t max_total_wal_size_ = 0;
bool enable_db_statistics_ = false;
int db_statistics_level_ = 0;
int max_write_buffer_num_ = 0;
int min_write_buffer_number_to_merge_ = 1;
int level0_stop_writes_trigger_ = 36;
Expand Down
9 changes: 9 additions & 0 deletions include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ class IncrCmd : public Cmd {
int64_t new_value_ = 0;
void DoInitial() override;
rocksdb::Status s_;
int64_t expired_timestamp_sec_ = 0;
std::string ToRedisProtocol() override;
};

class IncrbyCmd : public Cmd {
Expand All @@ -138,6 +140,8 @@ class IncrbyCmd : public Cmd {
int64_t by_ = 0, new_value_ = 0;
void DoInitial() override;
rocksdb::Status s_;
int64_t expired_timestamp_sec_ = 0;
std::string ToRedisProtocol() override;
};

class IncrbyfloatCmd : public Cmd {
Expand All @@ -161,6 +165,8 @@ class IncrbyfloatCmd : public Cmd {
double by_ = 0;
void DoInitial() override;
rocksdb::Status s_;
int64_t expired_timestamp_sec_ = 0;
std::string ToRedisProtocol() override;
};

class DecrCmd : public Cmd {
Expand Down Expand Up @@ -251,8 +257,11 @@ class AppendCmd : public Cmd {
private:
std::string key_;
std::string value_;
std::string new_value_;
void DoInitial() override;
rocksdb::Status s_;
int64_t expired_timestamp_sec_ = 0;
std::string ToRedisProtocol() override;
};

class MgetCmd : public Cmd {
Expand Down
2 changes: 1 addition & 1 deletion include/pika_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class BlockingBaseCmd : public Cmd {
void BlockThisClientToWaitLRPush(BlockKeyType block_pop_type, std::vector<std::string>& keys, int64_t expire_time);
void TryToServeBLrPopWithThisKey(const std::string& key, std::shared_ptr<DB> db);
static void ServeAndUnblockConns(void* args);
static void WriteBinlogOfPop(std::vector<WriteBinlogOfPopArgs>& pop_args);
static void WriteBinlogOfPopAndUpdateCache(std::vector<WriteBinlogOfPopArgs>& pop_args);
void removeDuplicates(std::vector<std::string>& keys_);
// blpop/brpop used functions end
};
Expand Down
3 changes: 2 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,8 @@ class PikaServer : public pstd::noncopyable {
*/
void DoTimingTask();
void AutoCompactRange();
void AutoPurge();
void AutoBinlogPurge();
void AutoServerlogPurge();
void AutoDeleteExpiredDump();
void AutoUpdateNetworkMetric();
void PrintThreadPoolQueueStatus();
Expand Down
7 changes: 4 additions & 3 deletions src/net/src/worker_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,6 @@ void* WorkerThread::ThreadMain() {
}

if (((pfe->mask & kErrorEvent) != 0) || (should_close != 0)) {
//check if this conn disconnected from being blocked by blpop/brpop
dynamic_cast<net::DispatchThread*>(server_thread_)->ClosingConnCheckForBlrPop(std::dynamic_pointer_cast<net::RedisConn>(in_conn));
net_multiplexer_->NetDelEvent(pfe->fd, 0);
CloseFd(in_conn);
in_conn = nullptr;
Expand Down Expand Up @@ -235,7 +233,6 @@ void WorkerThread::DoCronTask() {
}
conns_.clear();
deleting_conn_ipport_.clear();
return;
}

auto iter = conns_.begin();
Expand Down Expand Up @@ -274,9 +271,11 @@ void WorkerThread::DoCronTask() {
}
}
for (const auto& conn : to_close) {
net_multiplexer_->NetDelEvent(conn->fd(), 0);
CloseFd(conn);
}
for (const auto& conn : to_timeout) {
net_multiplexer_->NetDelEvent(conn->fd(), 0);
CloseFd(conn);
server_thread_->handle_->FdTimeoutHandle(conn->fd(), conn->ip_port());
}
Expand Down Expand Up @@ -304,6 +303,8 @@ bool WorkerThread::TryKillConn(const std::string& ip_port) {
void WorkerThread::CloseFd(const std::shared_ptr<NetConn>& conn) {
close(conn->fd());
if (auto dispatcher = dynamic_cast<DispatchThread *>(server_thread_); dispatcher != nullptr ) {
//check if this conn disconnected from being blocked by blpop/brpop
dispatcher->ClosingConnCheckForBlrPop(std::dynamic_pointer_cast<net::RedisConn>(conn));
dispatcher->RemoveWatchKeys(conn);
}
server_thread_->handle_->FdClosedHandle(conn->fd(), conn->ip_port());
Expand Down
12 changes: 12 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2241,6 +2241,18 @@ void ConfigCmd::ConfigGet(std::string& ret) {
: EncodeString(&config_body, "resetchannels");
}

if (pstd::stringmatch(pattern.data(), "enable-db-statistics", 1)) {
elements += 2;
EncodeString(&config_body, "enable-db-statistics");
EncodeString(&config_body, g_pika_conf->enable_db_statistics() ? "yes" : "no");
}

if (pstd::stringmatch(pattern.data(), "db-statistics-level", 1)) {
elements += 2;
EncodeString(&config_body, "db-statistics-level");
EncodeNumber(&config_body, g_pika_conf->db_statistics_level());
}

std::stringstream resp;
resp << "*" << std::to_string(elements) << "\r\n" << config_body;
ret = resp.str();
Expand Down
17 changes: 17 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ int PikaConf::Load() {
if (log_path_[log_path_.length() - 1] != '/') {
log_path_ += "/";
}
GetConfInt("log-retention-time",&log_retention_time_);
if(log_retention_time_ < 0){
LOG(FATAL) << "log-retention-time invalid";
}
GetConfStr("loglevel", &log_level_);
GetConfStr("db-path", &db_path_);
GetConfInt("db-instance-num", &db_instance_num_);
Expand Down Expand Up @@ -707,6 +711,17 @@ int PikaConf::Load() {
max_rsync_parallel_num_ = kMaxRsyncParallelNum;
}

// rocksdb_statistics_tickers
std::string open_tickers;
GetConfStr("enable-db-statistics", &open_tickers);
enable_db_statistics_ = open_tickers == "yes";

db_statistics_level_ = 0;
GetConfInt("db-statistics-level", &db_statistics_level_);
if (db_statistics_level_ < 0) {
db_statistics_level_ = 0;
}

int64_t tmp_rsync_timeout_ms = -1;
GetConfInt64("rsync-timeout-ms", &tmp_rsync_timeout_ms);
if(tmp_rsync_timeout_ms <= 0){
Expand Down Expand Up @@ -812,6 +827,8 @@ int PikaConf::ConfigRewrite() {
SetConfStr("slotmigrate", slotmigrate_.load() ? "yes" : "no");
SetConfInt64("slotmigrate-thread-num", slotmigrate_thread_num_);
SetConfInt64("thread-migrate-keys-num", thread_migrate_keys_num_);
SetConfStr("enable-db-statistics", enable_db_statistics_ ? "yes" : "no");
SetConfInt("db-statistics-level", db_statistics_level_);
// slaveof config item is special
SetConfStr("slaveof", slaveof_);
// cache config
Expand Down
Loading

0 comments on commit 95e4c48

Please sign in to comment.