Skip to content

Commit

Permalink
Fix bug #5662
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanFreeman committed Feb 8, 2025
1 parent 214334f commit a207f11
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 26 deletions.
8 changes: 2 additions & 6 deletions src/server/reactor_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,8 @@ void ReactorThread::shutdown(Reactor *reactor) {
}

if (serv->is_thread_mode()) {
Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_worker(reactor->id));
reactor->del(socket);
serv->stop_async_worker(serv->get_worker(reactor->id));
return;
}

SW_LOOP_N(serv->worker_num) {
Expand All @@ -339,10 +339,6 @@ void ReactorThread::shutdown(Reactor *reactor) {
}
});

if (serv->is_thread_mode()) {
serv->stop_async_worker(serv->get_worker(reactor->id));
}

reactor->set_wait_exit(true);
}

Expand Down
48 changes: 28 additions & 20 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -376,16 +376,13 @@ bool Server::kill_worker(WorkerId worker_id, bool wait_reactor) {
}

void Server::stop_async_worker(Worker *worker) {
Reactor *reactor = SwooleTG.reactor;

worker->shutdown();
if (worker->type == SW_PROCESS_EVENTWORKER) {
reset_worker_counter(worker);
}

/**
* force to end.
*/
// forced termination
Reactor *reactor = SwooleTG.reactor;
if (reload_async == 0) {
reactor->running = false;
return;
Expand All @@ -400,9 +397,12 @@ void Server::stop_async_worker(Worker *worker) {
SwooleWG.worker_copy = new Worker{};
*SwooleWG.worker_copy = *worker;
SwooleWG.worker = worker;

if (worker->pipe_worker && !worker->pipe_worker->removed) {
reactor->remove_read_event(worker->pipe_worker);
auto pipe_worker = worker->pipe_worker;
if (is_thread_mode()) {
pipe_worker = get_worker_message_bus()->get_pipe_socket(worker->pipe_worker);
}
if (pipe_worker && !pipe_worker->removed) {
reactor->remove_read_event(pipe_worker);
}

if (is_base_mode()) {
Expand All @@ -415,12 +415,12 @@ void Server::stop_async_worker(Worker *worker) {
onBeforeShutdown(this);
}
}
for (auto ls : ports) {
reactor->del(ls->socket);
}
if (worker->pipe_master && !worker->pipe_master->removed) {
reactor->remove_read_event(worker->pipe_master);
}
for (auto ls : ports) {
reactor->del(ls->socket);
}
foreach_connection([reactor](Connection *conn) {
if (!conn->peer_closed && !conn->socket->removed) {
reactor->remove_read_event(conn->socket);
Expand All @@ -437,6 +437,13 @@ void Server::stop_async_worker(Worker *worker) {
swoole_sys_warning("failed to push WORKER_STOP message");
}
} else if (is_thread_mode()) {
SW_LOOP_N(worker_num) {
if (i % reactor_num == reactor->id) {
auto pipe_master = get_worker_message_bus()->get_pipe_socket(get_worker_pipe_master(i));
reactor->remove_read_event(pipe_master);
}
}

foreach_connection([this, reactor](Connection *conn) {
if (conn->reactor_id == reactor->id && !conn->peer_closed && !conn->socket->removed) {
reactor->remove_read_event(conn->socket);
Expand All @@ -455,21 +462,21 @@ void Server::stop_async_worker(Worker *worker) {

static void Worker_reactor_try_to_exit(Reactor *reactor) {
Server *serv;
if (swoole_get_process_type() == SW_PROCESS_TASKWORKER) {
if (sw_likely(swoole_get_process_type() != SW_PROCESS_TASKWORKER)) {
serv = (Server *) reactor->ptr;
} else {
ProcessPool *pool = (ProcessPool *) reactor->ptr;
serv = (Server *) pool->ptr;
} else {
serv = (Server *) reactor->ptr;
}
uint8_t call_worker_exit_func = 0;

bool has_call_worker_exit_func = false;
while (1) {
if (reactor->if_exit()) {
reactor->running = false;
} else {
if (serv->onWorkerExit && call_worker_exit_func == 0) {
if (serv->onWorkerExit && !has_call_worker_exit_func) {
has_call_worker_exit_func = true;

Check warning on line 478 in src/server/worker.cc

View check run for this annotation

Codecov / codecov/patch

src/server/worker.cc#L478

Added line #L478 was not covered by tests
serv->onWorkerExit(serv, sw_worker());
call_worker_exit_func = 1;
continue;
}
int remaining_time = serv->max_wait_time - (::time(nullptr) - SwooleWG.exit_time);
Expand Down Expand Up @@ -504,16 +511,17 @@ void Server::drain_worker_pipe() {

void Server::clean_worker_connections(Worker *worker) {
sw_reactor()->destroyed = true;
if (sw_likely(is_base_mode())) {
foreach_connection([this](Connection *conn) { close(conn->session_id, true); });
return;

Check warning on line 516 in src/server/worker.cc

View check run for this annotation

Codecov / codecov/patch

src/server/worker.cc#L515-L516

Added lines #L515 - L516 were not covered by tests
}

if (is_thread_mode()) {
foreach_connection([this, worker](Connection *conn) {
if (conn->reactor_id == worker->id) {
close(conn->session_id, true);
}
});
} else if (is_base_mode()) {
foreach_connection([this](Connection *conn) { close(conn->session_id, true); });
} else {
return;
}
}
Expand Down
44 changes: 44 additions & 0 deletions tests/swoole_thread/server/bug_5662.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
--TEST--
swoole_thread/server: Github #5662
--SKIPIF--
<?php
require __DIR__ . '/../../include/skipif.inc';
skip_if_nts();
?>
--FILE--
<?php
require __DIR__ . '/../../include/bootstrap.php';

use Swoole\Thread;
use Swoole\Thread\Queue;

$port = get_constant_port(__FILE__);
$server = new Swoole\Http\Server('127.0.0.1', $port, SWOOLE_THREAD);
$server->set([
'log_file' => '/dev/null',
'worker_num' => 2,
'max_request' => 5,
'init_arguments' => function () {
global $queue;
$queue = new Queue();
return [$queue];
}
]);
$server->on('WorkerStart', function (Swoole\Server $server, $workerId) {
[$queue] = Thread::getArguments();
$queue->push('start', Queue::NOTIFY_ALL);
});
$server->addProcess(new Swoole\Process(function ($process) use ($server, $port) {
[$queue] = Thread::getArguments();
Assert::true($queue->pop(-1) == 'start');
for ($i = 0; $i < 20; $i++) {
Assert::true(file_get_contents("http://127.0.0.1:{$port}/") == 'OK');
}
$server->shutdown();
}));
$server->on('request', function (Swoole\Http\Request $request, Swoole\Http\Response $response) {
$response->end('OK');
});
$server->start();
?>
--EXPECT--

0 comments on commit a207f11

Please sign in to comment.