Skip to content

Commit

Permalink
Set bthread concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
Yang Liming committed Oct 24, 2023
1 parent 90dcf0f commit a863b2e
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 26 deletions.
56 changes: 34 additions & 22 deletions src/bthread/bthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ DEFINE_int32(bthread_min_concurrency, 0,
" The laziness is disabled when this value is non-positive,"
" and workers will be created eagerly according to -bthread_concurrency and bthread_setconcurrency(). ");

DEFINE_int32(bthread_tag_to_set, BTHREAD_TAG_INVALID,
"Set bthread_concurrency or bthread_min_concurrency for tag");
DEFINE_int32(bthread_current_tag, BTHREAD_TAG_DEFAULT, "Set bthread concurrency for this tag");

DEFINE_int32(bthread_addconcurrency_by_tag, 8 + BTHREAD_EPOLL_THREAD_NUM,
"Set bthread add concurrency by tag FLAGS_bthread_current_tag");

static bool never_set_bthread_concurrency = true;

Expand All @@ -58,10 +60,16 @@ const int ALLOW_UNUSED register_FLAGS_bthread_min_concurrency =
::GFLAGS_NS::RegisterFlagValidator(&FLAGS_bthread_min_concurrency,
validate_bthread_min_concurrency);

static bool validate_bthread_tag_to_set(const char*, int32_t val);
static bool validate_bthread_current_tag(const char*, int32_t val);

const int ALLOW_UNUSED register_FLAGS_bthread_current_tag =
::GFLAGS_NS::RegisterFlagValidator(&FLAGS_bthread_current_tag, validate_bthread_current_tag);

const int ALLOW_UNUSED register_FLAGS_bthread_tag_to_set =
::GFLAGS_NS::RegisterFlagValidator(&FLAGS_bthread_tag_to_set, validate_bthread_tag_to_set);
static bool validate_bthread_addconcurrency_by_tag(const char*, int32_t val);

const int ALLOW_UNUSED register_FLAGS_bthread_addconcurrency_by_tag =
::GFLAGS_NS::RegisterFlagValidator(&FLAGS_bthread_addconcurrency_by_tag,
validate_bthread_addconcurrency_by_tag);

BAIDU_CASSERT(sizeof(TaskControl*) == sizeof(butil::atomic<TaskControl*>), atomic_size_match);

Expand Down Expand Up @@ -109,18 +117,13 @@ inline TaskControl* get_or_new_task_control() {
static int add_workers_for_each_tag(int num) {
int res = 0;
auto c = get_task_control();
for (int i = 0; i < num; ++i) {
auto tag = i % FLAGS_task_group_ntags;
for (auto tag = BTHREAD_TAG_DEFAULT; tag < FLAGS_task_group_ntags && num > 0; ++tag) {
res += c->add_workers(1, tag);
--num;
}
return res;
}

static int add_workers_for_tag(int num, bthread_tag_t tag) {
auto c = get_task_control();
return c->add_workers(num, tag);
}

static bool validate_bthread_min_concurrency(const char*, int32_t val) {
if (val <= 0) {
return true;
Expand All @@ -142,8 +145,12 @@ static bool validate_bthread_min_concurrency(const char*, int32_t val) {
}
}

static bool validate_bthread_tag_to_set(const char*, int32_t val) {
return val >= BTHREAD_TAG_INVALID && val < FLAGS_task_group_ntags;
static bool validate_bthread_current_tag(const char*, int32_t val) {
return val > BTHREAD_TAG_INVALID && val < FLAGS_task_group_ntags;
}

static bool validate_bthread_addconcurrency_by_tag(const char*, int32_t val) {
return bthread_addconcurrency_by_tag(val, FLAGS_bthread_current_tag) == 0;
}

__thread TaskGroup* tls_task_group_nosignal = NULL;
Expand Down Expand Up @@ -344,19 +351,24 @@ int bthread_setconcurrency(int num) {
}
if (num > bthread::FLAGS_bthread_concurrency) {
// Create more workers if needed.
auto tag = bthread::FLAGS_bthread_tag_to_set;
auto add = num - bthread::FLAGS_bthread_concurrency;
if (tag == BTHREAD_TAG_INVALID) {
add = bthread::add_workers_for_each_tag(add);
} else {
add = bthread::add_workers_for_tag(add, tag);
}
bthread::FLAGS_bthread_concurrency += add;
int added = bthread::add_workers_for_each_tag(num - bthread::FLAGS_bthread_concurrency);
bthread::FLAGS_bthread_concurrency += added;
return 0;
}
return (num == bthread::FLAGS_bthread_concurrency ? 0 : EPERM);
}

int bthread_addconcurrency_by_tag(int num, bthread_tag_t tag) {
BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
auto c = bthread::get_task_control();
if (c == NULL) {
return 0;
}
int added = c->add_workers(num, tag);
bthread::FLAGS_bthread_concurrency += added;
return (added == num ? 0 : EPERM);
}

int bthread_about_to_quit() {
bthread::TaskGroup* g = bthread::tls_task_group;
if (g != NULL) {
Expand Down
3 changes: 3 additions & 0 deletions src/bthread/bthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ extern int bthread_getconcurrency(void);
// NOTE: currently concurrency cannot be reduced after any bthread created.
extern int bthread_setconcurrency(int num);

// Add number of worker pthreads to `num' for specified tag
extern int bthread_addconcurrency_by_tag(int num, bthread_tag_t tag);

// Yield processor to another bthread.
// Notice that current implementation is not fair, which means that
// even if bthread_yield() is called, suspended threads may still starve.
Expand Down
4 changes: 2 additions & 2 deletions src/bthread/task_control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ TaskControl::TaskControl()
// NOTE: all fileds must be initialized before the vars.
: _tagged_ngroup(FLAGS_task_group_ntags)
, _tagged_groups(FLAGS_task_group_ntags)
, _init(false)
, _stop(false)
, _concurrency(0)
, _next_worker_id(0)
Expand All @@ -177,7 +178,6 @@ TaskControl::TaskControl()
, _status(print_rq_sizes_in_the_tc, this)
, _nbthreads("bthread_count")
, _pl(FLAGS_task_group_ntags)
, _initialized(false)
{}

int TaskControl::init(int concurrency) {
Expand Down Expand Up @@ -234,7 +234,7 @@ int TaskControl::init(int concurrency) {
++i;
}

_initialized.store(true, butil::memory_order_release);
_init.store(true, butil::memory_order_release);

return 0;
}
Expand Down
4 changes: 2 additions & 2 deletions src/bthread/task_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class TaskControl {
std::vector<TaggedGroups> _tagged_groups;
butil::Mutex _modify_group_mutex;

butil::atomic<bool> _init; // if not init, bvar will case coredump
bool _stop;
butil::atomic<int> _concurrency;
std::vector<pthread_t> _workers;
Expand All @@ -140,7 +141,6 @@ class TaskControl {
std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;

std::vector<TaggedParkingLot> _pl;
butil::atomic<bool> _initialized; // if not initialize, bvar will case coredump
};

inline bvar::LatencyRecorder& TaskControl::exposed_pending_time() {
Expand All @@ -161,7 +161,7 @@ inline bvar::Adder<int64_t>& TaskControl::tag_nbthreads(bthread_tag_t tag) {

template <typename F>
inline void TaskControl::for_each_task_group(F const& f) {
if (_initialized.load(butil::memory_order_acquire) == false) {
if (_init.load(butil::memory_order_acquire) == false) {
return;
}
for (size_t i = 0; i < _tagged_groups.size(); ++i) {
Expand Down

0 comments on commit a863b2e

Please sign in to comment.