Skip to content

Commit

Permalink
add worker-thread-queue
Browse files Browse the repository at this point in the history
  • Loading branch information
wuxianrong committed Nov 8, 2024
1 parent 9721c9c commit 08ad77b
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
5 changes: 3 additions & 2 deletions src/net/include/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ class ThreadPool : public pstd::noncopyable {
int start();
int stop();
void Schedule(TaskFunc func, void* arg);
size_t cur_queue_size();
size_t cur_time_queue_size();


private:
void runInThread();
size_t max_queue_size();
void cur_queue_size(size_t* qsize);
void cur_time_queue_size(size_t* qsize);
void DelaySchedule(uint64_t timeout, TaskFunc func, void* arg);
bool should_stop();
void set_should_stop();
Expand Down
28 changes: 19 additions & 9 deletions src/net/src/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ bool ThreadPool::Worker::should_stop() { return should_stop_.load(); }
void ThreadPool::Worker::set_should_stop() { should_stop_.store(true); }

void ThreadPool::Worker::Schedule(TaskFunc func, void* arg) {
std::lock_guard lock(mu_);
//wsignal_.wait(lock, [worker]() { return queue_.size() < thread_pool_->max_queue_size_ || thread_pool_->should_stop(); });
std::unique_lock lock(mu_);
wsignal_.wait(lock, [this]() { return queue_.size() < max_queue_size_ || should_stop(); });
if (queue_.size() < max_queue_size_ && !should_stop()) {
queue_.emplace(func, arg);
rsignal_.notify_one();
Expand All @@ -105,6 +105,7 @@ void ThreadPool::Schedule(TaskFunc func, void* arg) {
for (int cnt = 0; cnt < worker_num_; cnt++) {
Worker* worker = workers_[next_thread];
worker->Schedule(func, arg);
last_thread_ = (next_thread + 1) % worker_num_;
next_thread = (next_thread + 1) % worker_num_;
}
}
Expand All @@ -117,7 +118,7 @@ void ThreadPool::Worker::DelaySchedule(uint64_t timeout, TaskFunc func, void* ar
uint64_t unow = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
uint64_t exec_time = unow + timeout * 1000;

std::lock_guard lock(mu_);
std::unique_lock lock(mu_);
if (!should_stop()) {
time_queue_.emplace(exec_time, func, arg);
rsignal_.notify_all();
Expand All @@ -126,23 +127,32 @@ void ThreadPool::Worker::DelaySchedule(uint64_t timeout, TaskFunc func, void* ar

size_t ThreadPool::Worker::max_queue_size() { return max_queue_size_; }

void ThreadPool::Worker::cur_queue_size(size_t* qsize) {
size_t ThreadPool::Worker::cur_queue_size() {
std::lock_guard lock(mu_);
*qsize = queue_.size();
return queue_.size();
}

void ThreadPool::Worker::cur_time_queue_size(size_t* qsize) {
size_t ThreadPool::Worker::cur_time_queue_size() {
std::lock_guard lock(mu_);
*qsize = time_queue_.size();
return time_queue_.size();
}

size_t ThreadPool::max_queue_size() { return max_queue_size_; }

void ThreadPool::cur_queue_size(size_t* qsize) {
*qsize = 10;
size_t qsize_sum = 0;
for (const auto worker : workers_) {
qsize_sum += worker->cur_queue_size();
}
qsize = &qsize_sum;
}

void ThreadPool::cur_time_queue_size(size_t* qsize) {
*qsize = 10;
size_t qsize_sum = 0;
for (const auto worker : workers_) {
qsize_sum += worker->cur_time_queue_size();
}
qsize = &qsize_sum;
}

std::string ThreadPool::thread_pool_name() { return thread_pool_name_; }
Expand Down

0 comments on commit 08ad77b

Please sign in to comment.