Skip to content

Commit

Permalink
node-api: combine threadsafe_function state flags into single variable
Browse files Browse the repository at this point in the history
  • Loading branch information
mika-fischer committed Nov 16, 2024
1 parent b45c815 commit 0384dff
Showing 1 changed file with 25 additions and 23 deletions.
48 changes: 25 additions & 23 deletions src/node_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ class ThreadSafeFunction {
resource,
*v8::String::Utf8Value(env_->isolate, name)),
thread_count(thread_count_),
is_closing(false),
is_closed(false),
state(OPEN),
dispatch_state(kDispatchIdle),
context(context_),
max_queue_size(max_queue_size_),
Expand All @@ -237,14 +236,14 @@ class ThreadSafeFunction {
node::Mutex::ScopedLock lock(this->mutex);

while (queue.size() >= max_queue_size && max_queue_size > 0 &&
!is_closing) {
state == OPEN) {
if (mode == napi_tsfn_nonblocking) {
return napi_queue_full;
}
cond->Wait(lock);
}

if (!is_closing) {
if (state == OPEN) {
queue.push(data);
Send();
return napi_ok;
Expand All @@ -253,7 +252,7 @@ class ThreadSafeFunction {
return napi_invalid_arg;
}
thread_count--;
if (!is_closed || thread_count > 0) {
if (!(state == CLOSED && thread_count == 0)) {
return napi_closing;
}
}
Expand All @@ -265,13 +264,13 @@ class ThreadSafeFunction {
napi_status Acquire() {
node::Mutex::ScopedLock lock(this->mutex);

if (is_closing) {
return napi_closing;
}
if (state == OPEN) {
thread_count++;

thread_count++;
return napi_ok;
}

return napi_ok;
return napi_closing;
}

napi_status Release(napi_threadsafe_function_release_mode mode) {
Expand All @@ -285,16 +284,18 @@ class ThreadSafeFunction {
thread_count--;

if (thread_count == 0 || mode == napi_tsfn_abort) {
if (!is_closing) {
is_closing = (mode == napi_tsfn_abort);
if (is_closing && max_queue_size > 0) {
if (state == OPEN) {
if (mode == napi_tsfn_abort) {
state = CLOSING;
}
if (state == CLOSING && max_queue_size > 0) {
cond->Signal(lock);
}
Send();
}
}

if (!is_closed || thread_count > 0) {
if (!(state == CLOSED && thread_count == 0)) {
return napi_ok;
}
}
Expand Down Expand Up @@ -372,8 +373,8 @@ class ThreadSafeFunction {

protected:
void ReleaseResources() {
if (!is_closed) {
is_closed = true;
if (state != CLOSED) {
state = CLOSED;
ref.Reset();
node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this);
env->Unref();
Expand Down Expand Up @@ -409,9 +410,7 @@ class ThreadSafeFunction {

{
node::Mutex::ScopedLock lock(this->mutex);
if (is_closing) {
CloseHandlesAndMaybeDelete();
} else {
if (state == OPEN) {
size_t size = queue.size();
if (size > 0) {
data = queue.front();
Expand All @@ -425,7 +424,7 @@ class ThreadSafeFunction {

if (size == 0) {
if (thread_count == 0) {
is_closing = true;
state = CLOSING;
if (max_queue_size > 0) {
cond->Signal(lock);
}
Expand All @@ -434,6 +433,8 @@ class ThreadSafeFunction {
} else {
has_more = true;
}
} else {
CloseHandlesAndMaybeDelete();
}
}

Expand Down Expand Up @@ -466,7 +467,7 @@ class ThreadSafeFunction {
v8::HandleScope scope(env->isolate);
if (set_closing) {
node::Mutex::ScopedLock lock(this->mutex);
is_closing = true;
state = CLOSING;
if (max_queue_size > 0) {
cond->Signal(lock);
}
Expand Down Expand Up @@ -538,6 +539,8 @@ class ThreadSafeFunction {
using node::AsyncResource::CallbackScope;
};

enum State : unsigned char { OPEN, CLOSING, CLOSED };

static const unsigned char kDispatchIdle = 0;
static const unsigned char kDispatchRunning = 1 << 0;
static const unsigned char kDispatchPending = 1 << 1;
Expand All @@ -552,8 +555,7 @@ class ThreadSafeFunction {
std::queue<void*> queue;
uv_async_t async;
size_t thread_count;
bool is_closing;
bool is_closed;
State state;
std::atomic_uchar dispatch_state;

// These are variables set once, upon creation, and then never again, which
Expand Down

0 comments on commit 0384dff

Please sign in to comment.