From f83381d68c6536bb7ef1fa442fe17d0ee371fa11 Mon Sep 17 00:00:00 2001 From: hantianfeng Date: Thu, 15 Aug 2024 16:32:22 +0800 Subject: [PATCH] fix test, optimize code --- include/swoole_message_bus.h | 5 ++++- include/swoole_server.h | 16 +++++----------- src/protocol/message_bus.cc | 18 +++++++++--------- src/server/base.cc | 14 ++++++++++---- src/server/master.cc | 11 +++++++++++ src/server/process.cc | 14 +++++++++++--- src/server/reactor_thread.cc | 12 +++++++----- 7 files changed, 57 insertions(+), 33 deletions(-) diff --git a/include/swoole_message_bus.h b/include/swoole_message_bus.h index 6391279d31a..faae0d1d69d 100644 --- a/include/swoole_message_bus.h +++ b/include/swoole_message_bus.h @@ -195,6 +195,9 @@ class MessageBus { * It is possible to operate the same pipe in multiple threads. * Each thread must have a unique buffer and the socket memory must be separated. */ - network::Socket *get_pipe_socket(int pipe_fd); + network::Socket *get_pipe_socket(network::Socket *sock) { + return pipe_sockets_[sock->get_fd()]; + } + void init_pipe_socket(network::Socket *sock); }; } // namespace swoole diff --git a/include/swoole_server.h b/include/swoole_server.h index 98d412c011e..9251aa04768 100644 --- a/include/swoole_server.h +++ b/include/swoole_server.h @@ -803,23 +803,16 @@ class Server { return connection_list[fd].socket; } - /** - * [ReactorThread] - */ - network::Socket *get_worker_pipe_socket(Worker *worker) { - return get_thread(SwooleTG.id)->message_bus.get_pipe_socket(worker->pipe_master->get_fd()); - } - network::Socket *get_command_reply_socket() { return is_base_mode() ? get_worker(0)->pipe_master : pipe_command->get_socket(false); } - int get_worker_pipe_master_fd(WorkerId id) { - return get_worker(id)->pipe_master->get_fd(); + network::Socket *get_worker_pipe_master(WorkerId id) { + return get_worker(id)->pipe_master; } - int get_worker_pipe_worker_fd(WorkerId id) { - return get_worker(id)->pipe_worker->get_fd(); + network::Socket *get_worker_pipe_worker(WorkerId id) { + return get_worker(id)->pipe_worker; } /** @@ -1382,6 +1375,7 @@ class Server { void init_port_protocol(ListenPort *port); void init_signal_handler(); void init_ipc_max_size(); + void init_pipe_sockets(MessageBus *mb); void set_max_connection(uint32_t _max_connection); diff --git a/src/protocol/message_bus.cc b/src/protocol/message_bus.cc index 8415b13a3cd..516072f9a38 100644 --- a/src/protocol/message_bus.cc +++ b/src/protocol/message_bus.cc @@ -287,17 +287,17 @@ size_t MessageBus::get_memory_size() { return size; } -network::Socket *MessageBus::get_pipe_socket(int pipe_fd) { - if ((size_t) pipe_fd >= pipe_sockets_.size()) { - auto _socket = make_socket(pipe_fd, SW_FD_PIPE); - _socket->buffer_size = UINT_MAX; - _socket->set_nonblock(); +void MessageBus::init_pipe_socket(network::Socket *sock) { + int pipe_fd = sock->get_fd(); + if (pipe_fd >= pipe_sockets_.size()) { pipe_sockets_.resize(pipe_fd + 1); - pipe_sockets_[pipe_fd] = _socket; - return _socket; - } else { - return pipe_sockets_[pipe_fd]; } + auto _socket = make_socket(pipe_fd, SW_FD_PIPE); + _socket->buffer_size = UINT_MAX; + if (!_socket->nonblock) { + _socket->set_nonblock(); + } + pipe_sockets_[pipe_fd] = _socket; } MessageBus::~MessageBus() { diff --git a/src/server/base.cc b/src/server/base.cc index 94680251910..8e8a465287a 100644 --- a/src/server/base.cc +++ b/src/server/base.cc @@ -224,10 +224,16 @@ bool BaseFactory::finish(SendData *data) { bool BaseFactory::forward_message(Session *session, SendData *data) { Worker *worker = server_->gs->event_workers.get_worker(session->reactor_id); - int pipe_fd = worker->pipe_master->get_fd(); - swoole_trace_log(SW_TRACE_SERVER, "forward message, fd=%d, len=%ld", pipe_fd, data->info.len); - auto message_bus = server_->get_worker_message_bus(); - if (!message_bus->write(message_bus->get_pipe_socket(pipe_fd), data)) { + swoole_trace_log(SW_TRACE_SERVER, + "fd=%d, worker_id=%d, type=%d, len=%ld", + worker->pipe_master->get_fd(), + session->reactor_id, + data->info.type, + data->info.len); + + auto mb = server_->get_worker_message_bus(); + auto sock = server_->is_thread_mode() ? mb->get_pipe_socket(worker->pipe_master) : worker->pipe_master; + if (!mb->write(sock, data)) { swoole_sys_warning("failed to send %u bytes to pipe_master", data->info.len); return false; } diff --git a/src/server/master.cc b/src/server/master.cc index 3ed57869fb2..a6d99b1e61a 100644 --- a/src/server/master.cc +++ b/src/server/master.cc @@ -2027,6 +2027,17 @@ void Server::init_ipc_max_size() { #endif } +void Server::init_pipe_sockets(MessageBus *mb) { + assert(is_started()); + size_t n = get_core_worker_num(); + + SW_LOOP_N(n) { + Worker *worker = get_worker(i); + mb->init_pipe_socket(worker->pipe_master); + mb->init_pipe_socket(worker->pipe_worker); + } +} + /** * allocate memory for Server::pipe_buffers */ diff --git a/src/server/process.cc b/src/server/process.cc index 44de0b3ab4f..1d8704122b8 100644 --- a/src/server/process.cc +++ b/src/server/process.cc @@ -279,10 +279,18 @@ bool ProcessFactory::dispatch(SendData *task) { SendData _task; memcpy(&_task, task, sizeof(SendData)); + network::Socket *sock; + MessageBus *mb; - network::Socket *pipe_socket = - server_->is_reactor_thread() ? server_->get_worker_pipe_socket(worker) : worker->pipe_master; - return server_->message_bus.write(pipe_socket, &_task); + if (server_->is_reactor_thread()) { + mb = &server_->get_thread(swoole_get_thread_id())->message_bus; + sock = mb->get_pipe_socket(worker->pipe_master); + } else { + mb = &server_->message_bus; + sock = worker->pipe_master; + } + + return mb->write(sock, &_task); } static bool inline process_is_supported_send_yield(Server *serv, Connection *conn) { diff --git a/src/server/reactor_thread.cc b/src/server/reactor_thread.cc index 36d3f7c54a5..c677732e50c 100644 --- a/src/server/reactor_thread.cc +++ b/src/server/reactor_thread.cc @@ -316,7 +316,7 @@ void ReactorThread::shutdown(Reactor *reactor) { } if (serv->is_thread_mode()) { - Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_worker_fd(reactor->id)); + Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_worker(reactor->id)); reactor->del(socket); } @@ -324,7 +324,7 @@ void ReactorThread::shutdown(Reactor *reactor) { if (i % serv->reactor_num != reactor->id) { continue; } - Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_master_fd(i)); + Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_master(i)); reactor->del(socket); } @@ -751,15 +751,17 @@ int ReactorThread::init(Server *serv, Reactor *reactor, uint16_t reactor_id) { } serv->init_reactor(reactor); + serv->init_pipe_sockets(&message_bus); + if (serv->is_thread_mode()) { Worker *worker = serv->get_worker(reactor_id); serv->init_worker(worker); - auto pipe_worker = message_bus.get_pipe_socket(worker->pipe_worker->get_fd()); + auto pipe_worker = message_bus.get_pipe_socket(worker->pipe_worker); reactor->add(pipe_worker, SW_EVENT_READ); } if (serv->pipe_command) { - pipe_command = message_bus.get_pipe_socket(serv->pipe_command->get_socket(false)->get_fd()); + pipe_command = message_bus.get_pipe_socket(serv->pipe_command->get_socket(false)); pipe_command->buffer_size = UINT_MAX; } @@ -774,7 +776,7 @@ int ReactorThread::init(Server *serv, Reactor *reactor, uint16_t reactor_id) { if (i % serv->reactor_num != reactor_id) { continue; } - Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_master_fd(i)); + Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_master(i)); if (reactor->add(socket, SW_EVENT_READ) < 0) { return SW_ERR; }