Skip to content

Commit

Permalink
feat:Split the admin command out of the main thread (OpenAtomFoundati…
Browse files Browse the repository at this point in the history
…on#2727)

Co-authored-by: chejinge <[email protected]>
  • Loading branch information
chejinge and brother-jin authored Jun 26, 2024
1 parent 0d71b24 commit 3890cd5
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 5 deletions.
9 changes: 9 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,18 @@ slow-cmd-pool : no
# are dedicated to handling slow user requests.
slow-cmd-thread-pool-size : 1

# Size of the low level thread pool, The threads within this pool
# are dedicated to handling slow user requests.
admin-thread-pool-size : 2

# Slow cmd list e.g. hgetall, mset
slow-cmd-list :

# List of commands considered as administrative. These commands will be handled by the admin thread pool. Modify this list as needed.
# Default commands: info, ping, monitor
# This parameter is only supported by the CONFIG GET command and not by CONFIG SET.
admin-cmd-list : info, ping, monitor

# The number of threads to write DB in slaveNode when replicating.
# It's preferable to set slave's sync-thread-num value close to master's thread-pool-size.
sync-thread-num : 6
Expand Down
29 changes: 29 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return slow_cmd_thread_pool_size_;
}
int admin_thread_pool_size() {
std::shared_lock l(rwlock_);
return admin_thread_pool_size_;
}
int sync_thread_num() {
std::shared_lock l(rwlock_);
return sync_thread_num_;
Expand Down Expand Up @@ -441,6 +445,12 @@ class PikaConf : public pstd::BaseConf {
return pstd::Set2String(slow_cmd_set_, ',');
}

// Admin Commands configuration
const std::string GetAdminCmd() {
std::shared_lock l(rwlock_);
return pstd::Set2String(admin_cmd_set_, ',');
}

const std::string GetUserBlackList() {
std::shared_lock l(rwlock_);
return userblacklist_;
Expand All @@ -451,6 +461,10 @@ class PikaConf : public pstd::BaseConf {
return slow_cmd_set_.find(cmd) != slow_cmd_set_.end();
}

bool is_admin_cmd(const std::string& cmd) {
return admin_cmd_set_.find(cmd) != admin_cmd_set_.end();
}

// Immutable config items, we don't use lock.
bool daemonize() { return daemonize_; }
std::string pidfile() { return pidfile_; }
Expand Down Expand Up @@ -489,6 +503,11 @@ class PikaConf : public pstd::BaseConf {
slow_cmd_thread_pool_size_ = value;
}

void SetAdminThreadPoolSize(const int value) {
std::lock_guard l(rwlock_);
admin_thread_pool_size_ = value;
}

void SetSlaveof(const std::string& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("slaveof", value);
Expand Down Expand Up @@ -814,6 +833,14 @@ class PikaConf : public pstd::BaseConf {
pstd::StringSplit2Set(lower_value, ',', slow_cmd_set_);
}

void SetAdminCmd(const std::string& value) {
std::lock_guard l(rwlock_);
std::string lower_value = value;
pstd::StringToLower(lower_value);
TryPushDiffCommands("admin-cmd-list", lower_value);
pstd::StringSplit2Set(lower_value, ',', admin_cmd_set_);
}

void SetCacheType(const std::string &value);
void SetCacheDisableFlag() { tmp_cache_disable_flag_ = true; }
int zset_cache_start_direction() { return zset_cache_start_direction_; }
Expand All @@ -832,7 +859,9 @@ class PikaConf : public pstd::BaseConf {
int thread_num_ = 0;
int thread_pool_size_ = 0;
int slow_cmd_thread_pool_size_ = 0;
int admin_thread_pool_size_ = 0;
std::unordered_set<std::string> slow_cmd_set_;
std::unordered_set<std::string> admin_cmd_set_ = {"info", "ping", "monitor"};
int sync_thread_num_ = 0;
int sync_binlog_thread_num_ = 0;
int expire_dump_days_ = 3;
Expand Down
3 changes: 2 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class PikaServer : public pstd::noncopyable {
/*
* PikaClientProcessor Process Task
*/
void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd);
void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd, bool is_admin_cmd);

// for info debug
size_t ClientProcessorThreadPoolCurQueueSize();
Expand Down Expand Up @@ -554,6 +554,7 @@ class PikaServer : public pstd::noncopyable {
int worker_num_ = 0;
std::unique_ptr<PikaClientProcessor> pika_client_processor_;
std::unique_ptr<net::ThreadPool> pika_slow_cmd_thread_pool_;
std::unique_ptr<net::ThreadPool> pika_admin_cmd_thread_pool_;
std::unique_ptr<PikaDispatchThread> pika_dispatch_thread_ = nullptr;

/*
Expand Down
13 changes: 12 additions & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1496,6 +1496,13 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, "slow-cmd-thread-pool-size");
EncodeNumber(&config_body, g_pika_conf->slow_cmd_thread_pool_size());
}

if (pstd::stringmatch(pattern.data(), "admin-thread-pool-size", 1) != 0) {
elements += 2;
EncodeString(&config_body, "admin-thread-pool-size");
EncodeNumber(&config_body, g_pika_conf->admin_thread_pool_size());
}

if (pstd::stringmatch(pattern.data(), "userblacklist", 1) != 0) {
elements += 2;
EncodeString(&config_body, "userblacklist");
Expand All @@ -1506,7 +1513,11 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, "slow-cmd-list");
EncodeString(&config_body, g_pika_conf->GetSlowCmd());
}

if (pstd::stringmatch(pattern.data(), "admin-cmd-list", 1) != 0) {
elements += 2;
EncodeString(&config_body, "admin-cmd-list");
EncodeString(&config_body, g_pika_conf->GetAdminCmd());
}
if (pstd::stringmatch(pattern.data(), "sync-thread-num", 1) != 0) {
elements += 2;
EncodeString(&config_body, "sync-thread-num");
Expand Down
3 changes: 2 additions & 1 deletion src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>&
std::string opt = argvs[0][0];
pstd::StringToLower(opt);
bool is_slow_cmd = g_pika_conf->is_slow_cmd(opt);
g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd);
bool is_admin_cmd = g_pika_conf->is_admin_cmd(opt);
g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd, is_admin_cmd);
return;
}
BatchExecRedisCmd(argvs);
Expand Down
15 changes: 15 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,25 @@ int PikaConf::Load() {
slow_cmd_thread_pool_size_ = 50;
}

GetConfInt("admin-thread-pool-size", &admin_thread_pool_size_);
if (admin_thread_pool_size_ <= 0) {
admin_thread_pool_size_ = 2;
}
if (admin_thread_pool_size_ > 4) {
admin_thread_pool_size_ = 4;
}

std::string slow_cmd_list;
GetConfStr("slow-cmd-list", &slow_cmd_list);
SetSlowCmd(slow_cmd_list);

std::string admin_cmd_list;
GetConfStr("admin-cmd-list", &admin_cmd_list);
if (admin_cmd_list == ""){
admin_cmd_list = "info, monitor, ping";
SetAdminCmd(admin_cmd_list);
}

GetConfInt("sync-thread-num", &sync_thread_num_);
if (sync_thread_num_ <= 0) {
sync_thread_num_ = 3;
Expand Down
3 changes: 2 additions & 1 deletion src/pika_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ void BlockingBaseCmd::TryToServeBLrPopWithThisKey(const std::string& key, std::s

auto* args = new UnblockTaskArgs(key, std::move(db), dispatchThread);
bool is_slow_cmd = g_pika_conf->is_slow_cmd("LPOP") || g_pika_conf->is_slow_cmd("RPOP");
g_pika_server->ScheduleClientPool(&ServeAndUnblockConns, args, is_slow_cmd);
bool is_admin_cmd = false;
g_pika_server->ScheduleClientPool(&ServeAndUnblockConns, args, is_slow_cmd, is_admin_cmd);
}

void BlockingBaseCmd::ServeAndUnblockConns(void* args) {
Expand Down
21 changes: 20 additions & 1 deletion src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ PikaServer::PikaServer()

pika_client_processor_ = std::make_unique<PikaClientProcessor>(g_pika_conf->thread_pool_size(), 100000);
pika_slow_cmd_thread_pool_ = std::make_unique<net::ThreadPool>(g_pika_conf->slow_cmd_thread_pool_size(), 100000);
pika_admin_cmd_thread_pool_ = std::make_unique<net::ThreadPool>(g_pika_conf->admin_thread_pool_size(), 100000);
instant_ = std::make_unique<Instant>();
exit_mutex_.lock();
int64_t lastsave = GetLastSaveTime(g_pika_conf->bgsave_path());
Expand Down Expand Up @@ -110,6 +111,7 @@ PikaServer::~PikaServer() {
// so we need to delete dispatch before worker.
pika_client_processor_->Stop();
pika_slow_cmd_thread_pool_->stop_thread_pool();
pika_admin_cmd_thread_pool_->stop_thread_pool();
{
std::lock_guard l(slave_mutex_);
auto iter = slaves_.begin();
Expand Down Expand Up @@ -168,6 +170,19 @@ void PikaServer::Start() {
LOG(FATAL) << "Start PikaClientProcessor Error: " << ret
<< (ret == net::kCreateThreadError ? ": create thread error " : ": other error");
}

ret = pika_slow_cmd_thread_pool_->start_thread_pool();
if (ret != net::kSuccess) {
dbs_.clear();
LOG(FATAL) << "Start PikaLowLevelThreadPool Error: " << ret
<< (ret == net::kCreateThreadError ? ": create thread error " : ": other error");
}
ret = pika_admin_cmd_thread_pool_->start_thread_pool();
if (ret != net::kSuccess) {
dbs_.clear();
LOG(FATAL) << "Start PikaAdminThreadPool Error: " << ret
<< (ret == net::kCreateThreadError ? ": create thread error " : ": other error");
}
ret = pika_dispatch_thread_->StartThread();
if (ret != net::kSuccess) {
dbs_.clear();
Expand Down Expand Up @@ -720,11 +735,15 @@ void PikaServer::SetFirstMetaSync(bool v) {
first_meta_sync_ = v;
}

void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd) {
void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd, bool is_admin_cmd) {
if (is_slow_cmd && g_pika_conf->slow_cmd_pool()) {
pika_slow_cmd_thread_pool_->Schedule(func, arg);
return;
}
if (is_admin_cmd) {
pika_admin_cmd_thread_pool_->Schedule(func, arg);
return;
}
pika_client_processor_->SchedulePool(func, arg);
}

Expand Down

0 comments on commit 3890cd5

Please sign in to comment.