Skip to content

Commit

Permalink
Add max_wait_time setting for the process pool to forcibly terminat…
Browse files Browse the repository at this point in the history
…e worker processes after they time out.
  • Loading branch information
matyhtf committed Jan 9, 2025
1 parent 3f78da5 commit 214334f
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 37 deletions.
33 changes: 33 additions & 0 deletions ext-src/php_swoole_cxx.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ static const char *sw_known_strings[] = {

SW_API zend_string **sw_zend_known_strings = nullptr;

SW_API zend_refcounted *sw_refcount_ptr;

zend_refcounted *sw_get_refcount_ptr(zval *value) {
return (sw_refcount_ptr = value->value.counted);
}

//----------------------------------known string------------------------------------
namespace zend {
void known_strings_init(void) {
Expand Down Expand Up @@ -66,4 +72,31 @@ Variable call(const std::string &func_name, int argc, zval *argv) {
}

} // namespace function

Callable::Callable(zval *_zfn) {
ZVAL_UNDEF(&zfn);
if (!zval_is_true(_zfn)) {
php_swoole_fatal_error(E_WARNING, "illegal callback function");
return;
}
if (!sw_zend_is_callable_ex(_zfn, nullptr, 0, &fn_name, nullptr, &fcc, nullptr)) {
php_swoole_fatal_error(E_WARNING, "function '%s' is not callable", fn_name);
return;
}
zfn = *_zfn;
zval_add_ref(&zfn);
}

Callable::~Callable() {
if (!ZVAL_IS_UNDEF(&zfn)) {
zval_ptr_dtor(&zfn);
}
if (fn_name) {
efree(fn_name);
}
}

uint32_t Callable::refcount() {
return zval_refcount_p(&zfn);
}
} // namespace zend
27 changes: 6 additions & 21 deletions ext-src/php_swoole_cxx.h
Original file line number Diff line number Diff line change
Expand Up @@ -593,18 +593,12 @@ class Callable {
Callable() {}

public:
Callable(zval *_zfn) {
ZVAL_UNDEF(&zfn);
if (!zval_is_true(_zfn)) {
php_swoole_fatal_error(E_WARNING, "illegal callback function");
return;
}
if (!sw_zend_is_callable_ex(_zfn, nullptr, 0, &fn_name, nullptr, &fcc, nullptr)) {
php_swoole_fatal_error(E_WARNING, "function '%s' is not callable", fn_name);
return;
}
zfn = *_zfn;
zval_add_ref(&zfn);
Callable(zval *_zfn);
~Callable();
uint32_t refcount();

zend_refcounted *refcount_ptr() {
return sw_get_refcount_ptr(&zfn);
}

zend_fcall_info_cache *ptr() {
Expand All @@ -629,15 +623,6 @@ class Callable {
bool call(uint32_t argc, zval *argv, zval *retval) {
return sw_zend_call_function_ex(&zfn, &fcc, argc, argv, retval) == SUCCESS;
}

~Callable() {
if (!ZVAL_IS_UNDEF(&zfn)) {
zval_ptr_dtor(&zfn);
}
if (fn_name) {
efree(fn_name);
}
}
};

#define _CONCURRENCY_HASHMAP_LOCK_(code) \
Expand Down
2 changes: 2 additions & 0 deletions ext-src/php_swoole_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,8 @@ static inline size_t sw_active_thread_count(void) {
}
#endif

zend_refcounted *sw_get_refcount_ptr(zval *value);

void sw_php_exit(int status);
void sw_php_print_backtrace(zend_long cid = 0,
zend_long options = 0,
Expand Down
2 changes: 2 additions & 0 deletions ext-src/php_swoole_process.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ void php_swoole_process_clean();
int php_swoole_process_start(swoole::Worker *process, zval *zobject);
swoole::Worker *php_swoole_process_get_worker(zval *zobject);
void php_swoole_process_set_worker(zval *zobject, swoole::Worker *worker);

swoole::ProcessPool *sw_process_pool();
32 changes: 31 additions & 1 deletion ext-src/swoole_process_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ static void process_pool_onWorkerStart(ProcessPool *pool, Worker *worker) {
zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerPid"), getpid());
zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerId"), worker->id);

swoole_set_process_type(SW_PROCESS_WORKER);
SwooleG.enable_coroutine = pp->enable_coroutine;

if (pp->onWorkerStart) {
zval args[2];
args[0] = *zobject;
Expand Down Expand Up @@ -259,6 +262,9 @@ static void process_pool_onStart(ProcessPool *pool) {
zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("master_pid"), getpid());
zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("running"), true);

swoole_set_process_type(SW_PROCESS_MASTER);
SwooleG.enable_coroutine = false;

if (pp->onStart == nullptr) {
return;
}
Expand Down Expand Up @@ -312,6 +318,10 @@ static void process_pool_signal_handler(int sig) {
}
}

ProcessPool *sw_process_pool() {
return current_pool;
}

static PHP_METHOD(swoole_process_pool, __construct) {
zval *zobject = ZEND_THIS;
zend_long worker_num;
Expand All @@ -321,15 +331,31 @@ static PHP_METHOD(swoole_process_pool, __construct) {

// only cli env
if (!SWOOLE_G(cli)) {
swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT);
zend_throw_error(NULL, "%s can only be used in PHP CLI mode", SW_Z_OBJCE_NAME_VAL_P(zobject));
RETURN_FALSE;
}

if (sw_server()) {
zend_throw_error(NULL, "%s cannot use in server process", SW_Z_OBJCE_NAME_VAL_P(zobject));
swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT);
zend_throw_error(NULL, "cannot create server and process pool instances simultaneously");
RETURN_FALSE;
}

if (sw_process_pool()) {
swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT);
zend_throw_error(NULL, "A process pool instance has already been created and cannot be created again");
RETURN_FALSE;
}

#ifdef SW_THREAD
if (!tsrm_is_main_thread()) {
swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT);
zend_throw_exception_ex(swoole_exception_ce, -1, "This operation is only allowed in the main thread");
RETURN_FALSE;
}
#endif

if (zend_parse_parameters_throw(ZEND_NUM_ARGS(), "l|llb", &worker_num, &ipc_type, &msgq_key, &enable_coroutine) ==
FAILURE) {
RETURN_FALSE;
Expand Down Expand Up @@ -390,6 +416,10 @@ static PHP_METHOD(swoole_process_pool, set) {
if (php_swoole_array_get_value(vht, "max_package_size", ztmp)) {
pool->set_max_packet_size(php_swoole_parse_to_size(ztmp));
}
if (php_swoole_array_get_value(vht, "max_wait_time", ztmp)) {
zend_long v = zval_get_long(ztmp);
pool->max_wait_time = SW_MAX(0, SW_MIN(v, UINT32_MAX));
}
}

static PHP_METHOD(swoole_process_pool, on) {
Expand Down
2 changes: 1 addition & 1 deletion ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2659,7 +2659,7 @@ static PHP_METHOD(swoole_server, start) {
RETURN_FALSE;
}

if (SwooleTG.reactor) {
if (sw_reactor()) {
php_swoole_fatal_error(
E_WARNING, "eventLoop has already been created, unable to start %s", SW_Z_OBJCE_NAME_VAL_P(zserv));
RETURN_FALSE;
Expand Down
17 changes: 14 additions & 3 deletions ext-src/swoole_timer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

#include "php_swoole_cxx.h"
#include "php_swoole_process.h"

#include "swoole_server.h"

Expand Down Expand Up @@ -148,6 +149,18 @@ static void timer_callback(Timer *timer, TimerNode *tnode) {
}
}

static bool timer_if_use_reactor() {
auto server = sw_server();
if (server) {
return server->is_user_worker() || (server->is_task_worker() && server->task_enable_coroutine);
}
auto process_pool = sw_process_pool();
if (process_pool) {
return !process_pool->is_master();
}
return true;
}

static void timer_add(INTERNAL_FUNCTION_PARAMETERS, bool persistent) {
zend_long ms;
Function *fci = (Function *) ecalloc(1, sizeof(Function));
Expand All @@ -166,9 +179,7 @@ static void timer_add(INTERNAL_FUNCTION_PARAMETERS, bool persistent) {
RETURN_FALSE;
}

// no server || user worker || task process with async mode
if (!sw_server() || sw_server()->is_user_worker() ||
(sw_server()->is_task_worker() && sw_server()->task_enable_coroutine)) {
if (UNEXPECTED(!sw_reactor() && timer_if_use_reactor())) {
php_swoole_check_reactor();
}

Expand Down
9 changes: 9 additions & 0 deletions include/swoole_process_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,14 @@ struct ProcessPool {
max_packet_size_ = _max_packet_size;
}

bool is_master() {
return swoole_get_process_type() == SW_PROCESS_MASTER;
}

bool is_worker() {
return swoole_get_process_type() == SW_PROCESS_WORKER;
}

void set_protocol(enum ProtocolType _protocol_type);

void set_max_request(uint32_t _max_request, uint32_t _max_request_grace);
Expand All @@ -339,6 +347,7 @@ struct ProcessPool {
bool reload();
pid_t spawn(Worker *worker);
void stop(Worker *worker);
void kill_all_workers(int signo = SIGKILL);
swResultCode dispatch(EventData *data, int *worker_id);
int response(const char *data, int length);
swResultCode dispatch_blocking(EventData *data, int *dst_worker_id);
Expand Down
5 changes: 2 additions & 3 deletions src/core/timer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ bool Timer::init() {
}

bool Timer::init_with_user_scheduler(const TimerScheduler &scheduler) {
set = [&scheduler](Timer *timer, long exec_msec) -> int {
return scheduler(timer, exec_msec);
};
set = [&scheduler](Timer *timer, long exec_msec) -> int { return scheduler(timer, exec_msec); };
close = [&scheduler](Timer *timer) { scheduler(timer, -1); };
return true;
}
Expand Down Expand Up @@ -236,6 +234,7 @@ int Timer::select() {
heap.pop();
map.erase(tnode->id);
delete tnode;
tnode = nullptr;
}

if (!tnode || !tmp) {
Expand Down
31 changes: 23 additions & 8 deletions src/os/process_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ int ProcessPool::create(uint32_t _worker_num, key_t _msgqueue_key, swIPCMode _ip
main_loop = run_with_task_protocol;
protocol_type_ = SW_PROTOCOL_TASK;
max_packet_size_ = SW_INPUT_BUFFER_SIZE;
max_wait_time = SW_WORKER_MAX_WAIT_TIME;

SW_LOOP_N(_worker_num) {
workers[i].pool = this;
Expand Down Expand Up @@ -444,6 +445,12 @@ void ProcessPool::stop(Worker *worker) {
}
}

void ProcessPool::kill_all_workers(int signo) {
SW_LOOP_N(worker_num) {
swoole_kill(workers[i].pid, signo);
}
}

void ProcessPool::shutdown() {
uint32_t i;
int status;
Expand All @@ -462,10 +469,20 @@ void ProcessPool::shutdown() {
continue;
}
}
if (max_wait_time) {
swoole_timer_add((long) max_wait_time * 1000, false, [this](Timer *, TimerNode *) { kill_all_workers(); });
}
for (i = 0; i < worker_num; i++) {
worker = &workers[i];
if (swoole_waitpid(worker->pid, &status, 0) < 0) {
swoole_sys_warning("waitpid(%d) failed", worker->pid);
SW_LOOP {
if (waitpid(worker->pid, &status, 0) < 0) {
if (errno == EINTR) {
sw_timer()->select();
continue;
}
swoole_sys_warning("waitpid(%d) failed", worker->pid);
}
break;
}
}
started = false;
Expand Down Expand Up @@ -853,7 +870,6 @@ bool ProcessPool::detach() {

int ProcessPool::wait() {
pid_t new_pid, reload_worker_pid = 0;
int ret;

while (running) {
ExitStatus exit_status = wait_process();
Expand Down Expand Up @@ -892,7 +908,7 @@ int ProcessPool::wait() {
}
if (!reloading) {
if (errno > 0 && errno != EINTR) {
swoole_sys_warning("[Manager] wait failed");
swoole_sys_warning("wait() failed");
}
continue;
} else {
Expand All @@ -913,7 +929,7 @@ int ProcessPool::wait() {
if (onWorkerNotFound) {
onWorkerNotFound(this, exit_status);
} else {
swoole_warning("[Manager]unknown worker[pid=%d]", exit_status.get_pid());
swoole_warning("unknown worker[pid=%d]", exit_status.get_pid());
}
continue;
}
Expand Down Expand Up @@ -944,13 +960,12 @@ int ProcessPool::wait() {
continue;
}
reload_worker_pid = reload_workers[reload_worker_i].pid;
ret = swoole_kill(reload_worker_pid, SIGTERM);
if (ret < 0) {
if (swoole_kill(reload_worker_pid, SIGTERM) < 0) {
if (errno == ECHILD) {
reload_worker_i++;
goto _kill_worker;
}
swoole_sys_warning("[Manager]swKill(%d) failed", reload_workers[reload_worker_i].pid);
swoole_sys_warning("kill(%d) failed", reload_workers[reload_worker_i].pid);
continue;
}
}
Expand Down
48 changes: 48 additions & 0 deletions tests/swoole_process_pool/max_wait_time.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
--TEST--
swoole_process_pool: max wait time
--SKIPIF--
<?php require __DIR__ . '/../include/skipif.inc';
?>
--FILE--
<?php
require __DIR__ . '/../include/bootstrap.php';

use Swoole\Atomic;
use Swoole\Constant;
use Swoole\Process\Pool;
use Swoole\Timer;

(function () {
$atomic = new Atomic();
$pool = new Pool(4, SWOOLE_IPC_NONE);
$pool->set([
Constant::OPTION_ENABLE_COROUTINE => true,
Constant::OPTION_MAX_WAIT_TIME => 1,
]);

$pool->on('workerStart', function (Pool $pool, int $workerId) use ($atomic): void {
echo "workerStart: $workerId" . PHP_EOL;
$atomic->wait(-1);
});

$pool->on('start', function () use ($pool): void {
Timer::after(500, function () use ($pool): void {
$pool->shutdown();
});
echo 'start' . PHP_EOL;
});

$pool->on('shutdown', function () use ($atomic): void {
echo 'shutdown' . PHP_EOL;
});

$pool->start();
})();
?>
--EXPECTF--
start
workerStart: %d
workerStart: %d
workerStart: %d
workerStart: %d
shutdown

0 comments on commit 214334f

Please sign in to comment.