From a207f11a98ffc941aca87d1f63b0c943887fdc4a Mon Sep 17 00:00:00 2001 From: MARiA so cute <33935209+NathanFreeman@users.noreply.github.com> Date: Fri, 24 Jan 2025 11:25:23 +0800 Subject: [PATCH] Fix bug #5662 --- src/server/reactor_thread.cc | 8 +--- src/server/worker.cc | 48 ++++++++++++++---------- tests/swoole_thread/server/bug_5662.phpt | 44 ++++++++++++++++++++++ 3 files changed, 74 insertions(+), 26 deletions(-) create mode 100644 tests/swoole_thread/server/bug_5662.phpt diff --git a/src/server/reactor_thread.cc b/src/server/reactor_thread.cc index 2c1a5e9ae6..ae7c9e7f8a 100644 --- a/src/server/reactor_thread.cc +++ b/src/server/reactor_thread.cc @@ -318,8 +318,8 @@ void ReactorThread::shutdown(Reactor *reactor) { } if (serv->is_thread_mode()) { - Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_worker(reactor->id)); - reactor->del(socket); + serv->stop_async_worker(serv->get_worker(reactor->id)); + return; } SW_LOOP_N(serv->worker_num) { @@ -339,10 +339,6 @@ void ReactorThread::shutdown(Reactor *reactor) { } }); - if (serv->is_thread_mode()) { - serv->stop_async_worker(serv->get_worker(reactor->id)); - } - reactor->set_wait_exit(true); } diff --git a/src/server/worker.cc b/src/server/worker.cc index 0f6ca46074..0a707f58dd 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -376,16 +376,13 @@ bool Server::kill_worker(WorkerId worker_id, bool wait_reactor) { } void Server::stop_async_worker(Worker *worker) { - Reactor *reactor = SwooleTG.reactor; - worker->shutdown(); if (worker->type == SW_PROCESS_EVENTWORKER) { reset_worker_counter(worker); } - /** - * force to end. - */ + // forced termination + Reactor *reactor = SwooleTG.reactor; if (reload_async == 0) { reactor->running = false; return; @@ -400,9 +397,12 @@ void Server::stop_async_worker(Worker *worker) { SwooleWG.worker_copy = new Worker{}; *SwooleWG.worker_copy = *worker; SwooleWG.worker = worker; - - if (worker->pipe_worker && !worker->pipe_worker->removed) { - reactor->remove_read_event(worker->pipe_worker); + auto pipe_worker = worker->pipe_worker; + if (is_thread_mode()) { + pipe_worker = get_worker_message_bus()->get_pipe_socket(worker->pipe_worker); + } + if (pipe_worker && !pipe_worker->removed) { + reactor->remove_read_event(pipe_worker); } if (is_base_mode()) { @@ -415,12 +415,12 @@ void Server::stop_async_worker(Worker *worker) { onBeforeShutdown(this); } } - for (auto ls : ports) { - reactor->del(ls->socket); - } if (worker->pipe_master && !worker->pipe_master->removed) { reactor->remove_read_event(worker->pipe_master); } + for (auto ls : ports) { + reactor->del(ls->socket); + } foreach_connection([reactor](Connection *conn) { if (!conn->peer_closed && !conn->socket->removed) { reactor->remove_read_event(conn->socket); @@ -437,6 +437,13 @@ void Server::stop_async_worker(Worker *worker) { swoole_sys_warning("failed to push WORKER_STOP message"); } } else if (is_thread_mode()) { + SW_LOOP_N(worker_num) { + if (i % reactor_num == reactor->id) { + auto pipe_master = get_worker_message_bus()->get_pipe_socket(get_worker_pipe_master(i)); + reactor->remove_read_event(pipe_master); + } + } + foreach_connection([this, reactor](Connection *conn) { if (conn->reactor_id == reactor->id && !conn->peer_closed && !conn->socket->removed) { reactor->remove_read_event(conn->socket); @@ -455,21 +462,21 @@ void Server::stop_async_worker(Worker *worker) { static void Worker_reactor_try_to_exit(Reactor *reactor) { Server *serv; - if (swoole_get_process_type() == SW_PROCESS_TASKWORKER) { + if (sw_likely(swoole_get_process_type() != SW_PROCESS_TASKWORKER)) { + serv = (Server *) reactor->ptr; + } else { ProcessPool *pool = (ProcessPool *) reactor->ptr; serv = (Server *) pool->ptr; - } else { - serv = (Server *) reactor->ptr; } - uint8_t call_worker_exit_func = 0; + bool has_call_worker_exit_func = false; while (1) { if (reactor->if_exit()) { reactor->running = false; } else { - if (serv->onWorkerExit && call_worker_exit_func == 0) { + if (serv->onWorkerExit && !has_call_worker_exit_func) { + has_call_worker_exit_func = true; serv->onWorkerExit(serv, sw_worker()); - call_worker_exit_func = 1; continue; } int remaining_time = serv->max_wait_time - (::time(nullptr) - SwooleWG.exit_time); @@ -504,6 +511,10 @@ void Server::drain_worker_pipe() { void Server::clean_worker_connections(Worker *worker) { sw_reactor()->destroyed = true; + if (sw_likely(is_base_mode())) { + foreach_connection([this](Connection *conn) { close(conn->session_id, true); }); + return; + } if (is_thread_mode()) { foreach_connection([this, worker](Connection *conn) { @@ -511,9 +522,6 @@ void Server::clean_worker_connections(Worker *worker) { close(conn->session_id, true); } }); - } else if (is_base_mode()) { - foreach_connection([this](Connection *conn) { close(conn->session_id, true); }); - } else { return; } } diff --git a/tests/swoole_thread/server/bug_5662.phpt b/tests/swoole_thread/server/bug_5662.phpt new file mode 100644 index 0000000000..e1fd17cc36 --- /dev/null +++ b/tests/swoole_thread/server/bug_5662.phpt @@ -0,0 +1,44 @@ +--TEST-- +swoole_thread/server: Github #5662 +--SKIPIF-- + +--FILE-- +set([ + 'log_file' => '/dev/null', + 'worker_num' => 2, + 'max_request' => 5, + 'init_arguments' => function () { + global $queue; + $queue = new Queue(); + return [$queue]; + } +]); +$server->on('WorkerStart', function (Swoole\Server $server, $workerId) { + [$queue] = Thread::getArguments(); + $queue->push('start', Queue::NOTIFY_ALL); +}); +$server->addProcess(new Swoole\Process(function ($process) use ($server, $port) { + [$queue] = Thread::getArguments(); + Assert::true($queue->pop(-1) == 'start'); + for ($i = 0; $i < 20; $i++) { + Assert::true(file_get_contents("http://127.0.0.1:{$port}/") == 'OK'); + } + $server->shutdown(); +})); +$server->on('request', function (Swoole\Http\Request $request, Swoole\Http\Response $response) { + $response->end('OK'); +}); +$server->start(); +?> +--EXPECT--