Skip to content

Commit

Permalink
Fix the issue of a multi-thread server unable to set port event callb…
Browse files Browse the repository at this point in the history
…ack, fix tests
  • Loading branch information
matyhtf committed Jan 8, 2025
1 parent 58c870f commit 3b732c9
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 20 deletions.
18 changes: 9 additions & 9 deletions ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2492,13 +2492,8 @@ static PHP_METHOD(swoole_server, getCallback) {

static PHP_METHOD(swoole_server, listen) {
Server *serv = php_swoole_server_get_and_check_server(ZEND_THIS);
if (serv->is_worker_thread()) {
swoole_set_last_error(SW_ERROR_SERVER_UNRELATED_THREAD);
RETURN_FALSE;
}

if (serv->is_started()) {
php_swoole_fatal_error(E_WARNING, "server is running, can't add listener");
if (!serv->is_worker_thread() && serv->is_started()) {
php_swoole_fatal_error(E_WARNING, "server is running, cannot add listener");
RETURN_FALSE;
}

Expand All @@ -2513,7 +2508,12 @@ static PHP_METHOD(swoole_server, listen) {
Z_PARAM_LONG(sock_type)
ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

ListenPort *ls = serv->add_port((enum swSocketType) sock_type, host, (int) port);
ListenPort *ls;
if (serv->is_worker_thread()) {
ls = serv->get_port((enum swSocketType) sock_type, host, (int) port);
} else {
ls = serv->add_port((enum swSocketType) sock_type, host, (int) port);
}
if (!ls) {
RETURN_FALSE;
}
Expand All @@ -2528,7 +2528,7 @@ extern Worker *php_swoole_process_get_and_check_worker(zval *zobject);
static PHP_METHOD(swoole_server, addProcess) {
Server *serv = php_swoole_server_get_and_check_server(ZEND_THIS);
if (!serv->is_worker_thread() && serv->is_started()) {
php_swoole_fatal_error(E_WARNING, "server is running, can't add process");
php_swoole_fatal_error(E_WARNING, "server is running, cannot add process");
RETURN_FALSE;
}

Expand Down
28 changes: 20 additions & 8 deletions include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,10 @@ class Server {
return mode_;
};

const ListenPort *get_port(int _port) const {
/**
* This method can only be used for INET ports and cannot obtain Unix socket ports.
*/
ListenPort *get_port(int _port) const {
for (auto port : ports) {
if (port->port == _port || _port == 0) {
return port;
Expand All @@ -791,23 +794,32 @@ class Server {
return nullptr;
}

ListenPort *get_port_by_server_fd(int server_fd) {
ListenPort *get_port(SocketType type, const char *host, int _port) const {
for (auto port : ports) {
if (port->port == _port && port->type == type && strcmp(host, port->host.c_str()) == 0) {
return port;
}
}
return nullptr;
}

ListenPort *get_port_by_server_fd(int server_fd) const {
return (ListenPort *) connection_list[server_fd].object;
}

ListenPort *get_port_by_fd(int fd) {
ListenPort *get_port_by_fd(int fd) const {
return get_port_by_server_fd(connection_list[fd].server_fd);
}

ListenPort *get_port_by_session_id(SessionId session_id) {
ListenPort *get_port_by_session_id(SessionId session_id) const {
Connection *conn = get_connection_by_session_id(session_id);
if (!conn) {
return nullptr;
}
return get_port_by_fd(conn->fd);
}

network::Socket *get_server_socket(int fd) {
network::Socket *get_server_socket(int fd) const {
return connection_list[fd].socket;
}

Expand Down Expand Up @@ -1286,7 +1298,7 @@ class Server {
}
}

int get_connection_fd(SessionId session_id) {
int get_connection_fd(SessionId session_id) const {
return session_list[session_id % SW_SESSION_LIST_SIZE].fd;
}

Expand All @@ -1313,7 +1325,7 @@ class Server {
return conn;
}

Connection *get_connection(int fd) {
Connection *get_connection(int fd) const {
if ((uint32_t) fd > max_connection) {
return nullptr;
}
Expand All @@ -1333,7 +1345,7 @@ class Server {
return nullptr;
}

Connection *get_connection_by_session_id(SessionId session_id) {
Connection *get_connection_by_session_id(SessionId session_id) const {
return get_connection(get_connection_fd(session_id));
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ void Server::stop_master_thread() {
Reactor *reactor = SwooleTG.reactor;
reactor->set_wait_exit(true);
for (auto port : ports) {
if (port->is_dgram() and is_process_mode()) {
if (port->is_dgram() && !is_base_mode()) {
continue;
}
if (!port->socket->removed) {
Expand Down
4 changes: 2 additions & 2 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,8 @@ void Server::stop_async_worker(Worker *worker) {
}
} else if (is_thread_mode()) {
foreach_connection([this, reactor](Connection *conn) {
if (conn->reactor_id == reactor->id) {
close(conn->session_id, true);
if (conn->reactor_id == reactor->id && !conn->peer_closed && !conn->socket->removed) {
reactor->remove_read_event(conn->socket);
}
});
} else {
Expand Down
76 changes: 76 additions & 0 deletions tests/swoole_thread/server/udp_port.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
--TEST--
swoole_thread/server: listen udp port
--SKIPIF--
<?php
require __DIR__ . '/../../include/skipif.inc';
skip_if_nts();
?>
--FILE--
<?php
require __DIR__ . '/../../include/bootstrap.php';

use Swoole\Thread;
use Swoole\Timer;
use Swoole\WebSocket\Server;

$port = get_constant_port(__FILE__);

$serv = new Server('127.0.0.1', $port, SWOOLE_THREAD);
$serv->set(array(
'worker_num' => 2,
'log_level' => SWOOLE_LOG_ERROR,
'reload_async' => true,
'init_arguments' => function () {
global $queue, $atomic;
$queue = new Swoole\Thread\Queue();
$atomic = new Swoole\Thread\Atomic(0);
return [$queue, $atomic];
}
));
$udp = $serv->addListener('127.0.0.1', $port + 1, SWOOLE_SOCK_UDP);
$udp->on('packet', function ($serv, $data, $addr) {
echo "udp packet\n";
$serv->sendto($addr['address'], $addr['port'], $data);
});
$serv->on('WorkerStart', function (Swoole\Server $serv, $workerId) use ($port) {
[$queue, $atomic] = Thread::getArguments();
if ($atomic->add() == 1) {
$queue->push("begin\n", Thread\Queue::NOTIFY_ALL);
}
echo "worker start\n";
});
$serv->on('message', function (Server $server, $frame) {
echo "message\n";
});
$serv->on('workerExit', function (Server $server, $wid) {
var_dump('worker exit: ' . $wid);
Timer::clearAll();
});
$serv->on('shutdown', function (Server $server) {
global $queue, $atomic;
echo 'shutdown', PHP_EOL;
Assert::eq($atomic->get(), $server->setting['worker_num']);
});
$serv->addProcess(new Swoole\Process(function ($process) use ($serv) {
[$queue, $atomic] = Thread::getArguments();
global $port;
echo $queue->pop(-1);
Co\run(function () use ($port) {
$udp_sock = stream_socket_client('udp://127.0.0.1:' . ($port + 1), $errno, $errstr);
$pkt = random_bytes(1024);
fwrite($udp_sock, $pkt);
$data = fread($udp_sock, 1024);
Assert::eq($pkt, $data);
});
echo "done\n";
$serv->shutdown();
}));
$serv->start();
?>
--EXPECT--
worker start
worker start
begin
udp packet
done
shutdown
5 changes: 5 additions & 0 deletions tests/swoole_thread/sort.phpt
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
--TEST--
swoole_thread: sort
--SKIPIF--
<?php
require __DIR__ . '/../include/skipif.inc';
skip_if_nts();
?>
--FILE--
<?php
require __DIR__ . '/../include/bootstrap.php';
Expand Down

0 comments on commit 3b732c9

Please sign in to comment.