From 3b732c9461fa826f7bdcd6e67ae87fe7b77ad309 Mon Sep 17 00:00:00 2001 From: matyhtf Date: Wed, 8 Jan 2025 15:23:15 +0800 Subject: [PATCH] Fix the issue of a multi-thread server unable to set port event callback, fix tests --- ext-src/swoole_server.cc | 18 +++--- include/swoole_server.h | 28 ++++++--- src/server/master.cc | 2 +- src/server/worker.cc | 4 +- tests/swoole_thread/server/udp_port.phpt | 76 ++++++++++++++++++++++++ tests/swoole_thread/sort.phpt | 5 ++ 6 files changed, 113 insertions(+), 20 deletions(-) create mode 100644 tests/swoole_thread/server/udp_port.phpt diff --git a/ext-src/swoole_server.cc b/ext-src/swoole_server.cc index 47cb1f6867..3e7a00fc72 100644 --- a/ext-src/swoole_server.cc +++ b/ext-src/swoole_server.cc @@ -2492,13 +2492,8 @@ static PHP_METHOD(swoole_server, getCallback) { static PHP_METHOD(swoole_server, listen) { Server *serv = php_swoole_server_get_and_check_server(ZEND_THIS); - if (serv->is_worker_thread()) { - swoole_set_last_error(SW_ERROR_SERVER_UNRELATED_THREAD); - RETURN_FALSE; - } - - if (serv->is_started()) { - php_swoole_fatal_error(E_WARNING, "server is running, can't add listener"); + if (!serv->is_worker_thread() && serv->is_started()) { + php_swoole_fatal_error(E_WARNING, "server is running, cannot add listener"); RETURN_FALSE; } @@ -2513,7 +2508,12 @@ static PHP_METHOD(swoole_server, listen) { Z_PARAM_LONG(sock_type) ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE); - ListenPort *ls = serv->add_port((enum swSocketType) sock_type, host, (int) port); + ListenPort *ls; + if (serv->is_worker_thread()) { + ls = serv->get_port((enum swSocketType) sock_type, host, (int) port); + } else { + ls = serv->add_port((enum swSocketType) sock_type, host, (int) port); + } if (!ls) { RETURN_FALSE; } @@ -2528,7 +2528,7 @@ extern Worker *php_swoole_process_get_and_check_worker(zval *zobject); static PHP_METHOD(swoole_server, addProcess) { Server *serv = php_swoole_server_get_and_check_server(ZEND_THIS); if (!serv->is_worker_thread() && serv->is_started()) { - php_swoole_fatal_error(E_WARNING, "server is running, can't add process"); + php_swoole_fatal_error(E_WARNING, "server is running, cannot add process"); RETURN_FALSE; } diff --git a/include/swoole_server.h b/include/swoole_server.h index 238bb0cd89..d8393edcfa 100644 --- a/include/swoole_server.h +++ b/include/swoole_server.h @@ -782,7 +782,10 @@ class Server { return mode_; }; - const ListenPort *get_port(int _port) const { + /** + * This method can only be used for INET ports and cannot obtain Unix socket ports. + */ + ListenPort *get_port(int _port) const { for (auto port : ports) { if (port->port == _port || _port == 0) { return port; @@ -791,15 +794,24 @@ class Server { return nullptr; } - ListenPort *get_port_by_server_fd(int server_fd) { + ListenPort *get_port(SocketType type, const char *host, int _port) const { + for (auto port : ports) { + if (port->port == _port && port->type == type && strcmp(host, port->host.c_str()) == 0) { + return port; + } + } + return nullptr; + } + + ListenPort *get_port_by_server_fd(int server_fd) const { return (ListenPort *) connection_list[server_fd].object; } - ListenPort *get_port_by_fd(int fd) { + ListenPort *get_port_by_fd(int fd) const { return get_port_by_server_fd(connection_list[fd].server_fd); } - ListenPort *get_port_by_session_id(SessionId session_id) { + ListenPort *get_port_by_session_id(SessionId session_id) const { Connection *conn = get_connection_by_session_id(session_id); if (!conn) { return nullptr; @@ -807,7 +819,7 @@ class Server { return get_port_by_fd(conn->fd); } - network::Socket *get_server_socket(int fd) { + network::Socket *get_server_socket(int fd) const { return connection_list[fd].socket; } @@ -1286,7 +1298,7 @@ class Server { } } - int get_connection_fd(SessionId session_id) { + int get_connection_fd(SessionId session_id) const { return session_list[session_id % SW_SESSION_LIST_SIZE].fd; } @@ -1313,7 +1325,7 @@ class Server { return conn; } - Connection *get_connection(int fd) { + Connection *get_connection(int fd) const { if ((uint32_t) fd > max_connection) { return nullptr; } @@ -1333,7 +1345,7 @@ class Server { return nullptr; } - Connection *get_connection_by_session_id(SessionId session_id) { + Connection *get_connection_by_session_id(SessionId session_id) const { return get_connection(get_connection_fd(session_id)); } diff --git a/src/server/master.cc b/src/server/master.cc index 955af90cf6..b7cfa038d6 100644 --- a/src/server/master.cc +++ b/src/server/master.cc @@ -910,7 +910,7 @@ void Server::stop_master_thread() { Reactor *reactor = SwooleTG.reactor; reactor->set_wait_exit(true); for (auto port : ports) { - if (port->is_dgram() and is_process_mode()) { + if (port->is_dgram() && !is_base_mode()) { continue; } if (!port->socket->removed) { diff --git a/src/server/worker.cc b/src/server/worker.cc index fe63b34176..0f6ca46074 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -438,8 +438,8 @@ void Server::stop_async_worker(Worker *worker) { } } else if (is_thread_mode()) { foreach_connection([this, reactor](Connection *conn) { - if (conn->reactor_id == reactor->id) { - close(conn->session_id, true); + if (conn->reactor_id == reactor->id && !conn->peer_closed && !conn->socket->removed) { + reactor->remove_read_event(conn->socket); } }); } else { diff --git a/tests/swoole_thread/server/udp_port.phpt b/tests/swoole_thread/server/udp_port.phpt new file mode 100644 index 0000000000..f6f5c656cf --- /dev/null +++ b/tests/swoole_thread/server/udp_port.phpt @@ -0,0 +1,76 @@ +--TEST-- +swoole_thread/server: listen udp port +--SKIPIF-- + +--FILE-- +set(array( + 'worker_num' => 2, + 'log_level' => SWOOLE_LOG_ERROR, + 'reload_async' => true, + 'init_arguments' => function () { + global $queue, $atomic; + $queue = new Swoole\Thread\Queue(); + $atomic = new Swoole\Thread\Atomic(0); + return [$queue, $atomic]; + } +)); +$udp = $serv->addListener('127.0.0.1', $port + 1, SWOOLE_SOCK_UDP); +$udp->on('packet', function ($serv, $data, $addr) { + echo "udp packet\n"; + $serv->sendto($addr['address'], $addr['port'], $data); +}); +$serv->on('WorkerStart', function (Swoole\Server $serv, $workerId) use ($port) { + [$queue, $atomic] = Thread::getArguments(); + if ($atomic->add() == 1) { + $queue->push("begin\n", Thread\Queue::NOTIFY_ALL); + } + echo "worker start\n"; +}); +$serv->on('message', function (Server $server, $frame) { + echo "message\n"; +}); +$serv->on('workerExit', function (Server $server, $wid) { + var_dump('worker exit: ' . $wid); + Timer::clearAll(); +}); +$serv->on('shutdown', function (Server $server) { + global $queue, $atomic; + echo 'shutdown', PHP_EOL; + Assert::eq($atomic->get(), $server->setting['worker_num']); +}); +$serv->addProcess(new Swoole\Process(function ($process) use ($serv) { + [$queue, $atomic] = Thread::getArguments(); + global $port; + echo $queue->pop(-1); + Co\run(function () use ($port) { + $udp_sock = stream_socket_client('udp://127.0.0.1:' . ($port + 1), $errno, $errstr); + $pkt = random_bytes(1024); + fwrite($udp_sock, $pkt); + $data = fread($udp_sock, 1024); + Assert::eq($pkt, $data); + }); + echo "done\n"; + $serv->shutdown(); +})); +$serv->start(); +?> +--EXPECT-- +worker start +worker start +begin +udp packet +done +shutdown diff --git a/tests/swoole_thread/sort.phpt b/tests/swoole_thread/sort.phpt index cd8c227e93..9910236873 100644 --- a/tests/swoole_thread/sort.phpt +++ b/tests/swoole_thread/sort.phpt @@ -1,5 +1,10 @@ --TEST-- swoole_thread: sort +--SKIPIF-- + --FILE--