Skip to content

Commit

Permalink
Enhance process pool, add onShutdown/onWorkerExt event, add running/w…
Browse files Browse the repository at this point in the history
…orkerRunning/workerPid/workerId props
  • Loading branch information
matyhtf committed Dec 5, 2024
1 parent 5359616 commit 44c23e2
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 21 deletions.
7 changes: 7 additions & 0 deletions ext-src/php_swoole_cxx.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
_(SW_ZEND_STR_USE_PIPELINE_READ, "usePipelineRead") \
_(SW_ZEND_STR_TRAILER, "trailer") \
_(SW_ZEND_STR_MASTER_PID, "master_pid") \
_(SW_ZEND_PROP_MASTER_PID, "masterPid") \
_(SW_ZEND_STR_CALLBACK, "callback") \
_(SW_ZEND_STR_OPCODE, "opcode") \
_(SW_ZEND_STR_CODE, "code") \
Expand Down Expand Up @@ -737,6 +738,12 @@ static inline void array_unset(zval *arg, const char *key, size_t l_key) {
zend_hash_str_del(Z_ARRVAL_P(arg), key, l_key);
}

static inline zend_long object_get_long(zval *obj, zend_string *key) {
static zval rv;
zval *property = zend_read_property_ex(Z_OBJCE_P(obj), Z_OBJ_P(obj), key, 1, &rv);
return property ? zval_get_long(property) : 0;
}

static inline zend_long object_get_long(zval *obj, const char *key, size_t l_key) {
static zval rv;
zval *property = zend_read_property(Z_OBJCE_P(obj), Z_OBJ_P(obj), key, l_key, 1, &rv);
Expand Down
132 changes: 114 additions & 18 deletions ext-src/swoole_process_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ static Worker *current_worker = nullptr;
struct ProcessPoolObject {
ProcessPool *pool;
zend::Callable *onStart;
zend::Callable *onShutdown;
zend::Callable *onWorkerStart;
zend::Callable *onWorkerStop;
zend::Callable *onWorkerExit;
zend::Callable *onMessage;
zend_bool enable_coroutine;
zend_bool enable_message_bus;
Expand Down Expand Up @@ -86,6 +88,12 @@ static void process_pool_free_object(zend_object *object) {
if (pp->onStart) {
sw_callable_free(pp->onStart);
}
if (pp->onWorkerExit) {
sw_callable_free(pp->onWorkerExit);
}
if (pp->onShutdown) {
sw_callable_free(pp->onShutdown);
}

zend_object_std_dtor(object);
}
Expand Down Expand Up @@ -140,8 +148,14 @@ void php_swoole_process_pool_minit(int module_number) {
SW_SET_CLASS_CUSTOM_OBJECT(
swoole_process_pool, process_pool_create_object, process_pool_free_object, ProcessPoolObject, std);

zend_declare_property_long(swoole_process_pool_ce, ZEND_STRL("master_pid"), -1, ZEND_ACC_PUBLIC);
zend_declare_property_long(
swoole_process_pool_ce, ZEND_STRL("master_pid"), -1, ZEND_ACC_PUBLIC | ZEND_ACC_DEPRECATED);
zend_declare_property_long(swoole_process_pool_ce, ZEND_STRL("masterPid"), -1, ZEND_ACC_PUBLIC);
zend_declare_property_long(swoole_process_pool_ce, ZEND_STRL("workerPid"), -1, ZEND_ACC_PUBLIC);
zend_declare_property_long(swoole_process_pool_ce, ZEND_STRL("workerId"), -1, ZEND_ACC_PUBLIC);
zend_declare_property_null(swoole_process_pool_ce, ZEND_STRL("workers"), ZEND_ACC_PUBLIC);
zend_declare_property_bool(swoole_process_pool_ce, ZEND_STRL("workerRunning"), -1, ZEND_ACC_PUBLIC);
zend_declare_property_bool(swoole_process_pool_ce, ZEND_STRL("running"), -1, ZEND_ACC_PUBLIC);
}

static void process_pool_onWorkerStart(ProcessPool *pool, Worker *worker) {
Expand All @@ -152,7 +166,12 @@ static void process_pool_onWorkerStart(ProcessPool *pool, Worker *worker) {
current_pool = pool;
current_worker = worker;

if (pp->onMessage) {
zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("running"), true);
zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerRunning"), true);
zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerPid"), getpid());
zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerId"), worker->id);

if (pp->onMessage || pp->enable_coroutine) {
swoole_signal_set(SIGTERM, process_pool_signal_handler);
}

Expand Down Expand Up @@ -201,6 +220,9 @@ static void process_pool_onWorkerStop(ProcessPool *pool, Worker *worker) {
ProcessPoolObject *pp = process_pool_fetch_object(zobject);
zval args[2];

zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("running"), false);
zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerRunning"), false);

if (pp->onWorkerStop == nullptr) {
return;
}
Expand All @@ -213,6 +235,64 @@ static void process_pool_onWorkerStop(ProcessPool *pool, Worker *worker) {
}
}

static void process_pool_onWorkerExit(ProcessPool *pool, Worker *worker) {
zval *zobject = (zval *) pool->ptr;
ProcessPoolObject *pp = process_pool_fetch_object(zobject);
zval args[2];

zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("running"), false);
zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerRunning"), false);

if (pp->onWorkerExit == nullptr) {
return;
}

args[0] = *zobject;
ZVAL_LONG(&args[1], worker->id);

if (UNEXPECTED(!zend::function::call(pp->onWorkerExit->ptr(), 2, args, nullptr, false))) {
php_swoole_error(E_WARNING, "%s->onWorkerExit handler error", SW_Z_OBJCE_NAME_VAL_P(zobject));
}
}

static void process_pool_onStart(ProcessPool *pool) {
zval *zobject = (zval *) pool->ptr;
ProcessPoolObject *pp = process_pool_fetch_object(zobject);
zval args[1];

zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("master_pid"), getpid());
zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("masterPid"), getpid());
zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("running"), true);

if (pp->onStart == nullptr) {
return;
}

args[0] = *zobject;
if (UNEXPECTED(!zend::function::call(pp->onStart->ptr(), 1, args, nullptr, false))) {
php_swoole_error(E_WARNING, "%s->onStart handler error", SW_Z_OBJCE_NAME_VAL_P(zobject));
}
}

static void process_pool_onShutdown(ProcessPool *pool) {
zval *zobject = (zval *) pool->ptr;
ProcessPoolObject *pp = process_pool_fetch_object(zobject);
zval args[1];

zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("running"), false);
zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerRunning"), false);

if (pp->onShutdown == nullptr) {
return;
}

args[0] = *zobject;

if (UNEXPECTED(!zend::function::call(pp->onShutdown->ptr(), 1, args, nullptr, false))) {
php_swoole_error(E_WARNING, "%s->onShutdown handler error", SW_Z_OBJCE_NAME_VAL_P(zobject));
}
}

static void process_pool_signal_handler(int sig) {
if (!current_pool) {
return;
Expand Down Expand Up @@ -343,7 +423,8 @@ static PHP_METHOD(swoole_process_pool, on) {
pp->onWorkerStart = sw_callable_create(zfn);
} else if (SW_STRCASEEQ(name, l_name, "Message")) {
if (pool->ipc_mode == SW_IPC_NONE) {
php_swoole_fatal_error(E_WARNING, "cannot set onMessage event with ipc_type=0");
zend_throw_exception(
swoole_exception_ce, "cannot set `onMessage` event with ipc_type=0", SW_ERROR_INVALID_PARAMS);
RETURN_FALSE;
}
if (pp->onMessage) {
Expand All @@ -355,11 +436,21 @@ static PHP_METHOD(swoole_process_pool, on) {
sw_callable_free(pp->onWorkerStop);
}
pp->onWorkerStop = sw_callable_create(zfn);
} else if (SW_STRCASEEQ(name, l_name, "WorkerExit")) {
if (pp->onWorkerExit) {
sw_callable_free(pp->onWorkerExit);
}
pp->onWorkerExit = sw_callable_create(zfn);
} else if (SW_STRCASEEQ(name, l_name, "Start")) {
if (pp->onStart) {
sw_callable_free(pp->onStart);
}
pp->onStart = sw_callable_create(zfn);
} else if (SW_STRCASEEQ(name, l_name, "Shutdown")) {
if (pp->onShutdown) {
sw_callable_free(pp->onShutdown);
}
pp->onShutdown = sw_callable_create(zfn);
} else {
php_swoole_error(E_WARNING, "unknown event type[%s]", name);
RETURN_FALSE;
Expand Down Expand Up @@ -489,31 +580,33 @@ static PHP_METHOD(swoole_process_pool, start) {
}
}

if (pp->onWorkerExit && !pp->enable_coroutine) {
zend_throw_exception(
swoole_exception_ce, "cannot set `onWorkerExit` without enable_coroutine", SW_ERROR_INVALID_PARAMS);
RETURN_FALSE;
}

if (pp->onMessage) {
pool->onMessage = process_pool_onMessage;
} else {
pool->main_loop = nullptr;
}

current_pool = pool;

pool->onStart = process_pool_onStart;
pool->onShutdown = process_pool_onShutdown;
pool->onWorkerStart = process_pool_onWorkerStart;
pool->onWorkerStop = process_pool_onWorkerStop;

zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("master_pid"), getpid());
if (pp->enable_coroutine && pp->onWorkerExit) {
pool->onWorkerExit = process_pool_onWorkerExit;
}

if (pool->start() < 0) {
RETURN_FALSE;
}

current_pool = pool;

if (pp->onStart) {
zval args[1];
args[0] = *ZEND_THIS;
if (UNEXPECTED(!zend::function::call(pp->onStart->ptr(), 1, args, nullptr, 0))) {
php_swoole_error(E_WARNING, "%s->onStart handler error", SW_Z_OBJCE_NAME_VAL_P(ZEND_THIS));
}
}

pool->wait();
pool->shutdown();

Expand Down Expand Up @@ -618,10 +711,13 @@ static PHP_METHOD(swoole_process_pool, stop) {
}

static PHP_METHOD(swoole_process_pool, shutdown) {
zval *retval =
sw_zend_read_property_ex(swoole_process_pool_ce, ZEND_THIS, SW_ZSTR_KNOWN(SW_ZEND_STR_MASTER_PID), 0);
long pid = zval_get_long(retval);
RETURN_BOOL(swoole_kill(pid, SIGTERM) == 0);
long pid = zend::object_get_long(ZEND_THIS, SW_ZSTR_KNOWN(SW_ZEND_PROP_MASTER_PID));
if (pid > 0) {
RETURN_BOOL(swoole_kill(pid, SIGTERM) == 0);
} else {
zend_throw_exception(swoole_exception_ce, "invalid master pid", SW_ERROR_INVALID_PARAMS);
RETURN_FALSE;
}
}

static PHP_METHOD(swoole_process_pool, __destruct) {}
4 changes: 3 additions & 1 deletion include/swoole_process_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,12 @@ struct ProcessPool {
uint8_t scheduler_warning;
time_t warning_time;

void (*onStart)(ProcessPool *pool);
void (*onShutdown)(ProcessPool *pool);
int (*onTask)(ProcessPool *pool, Worker *worker, EventData *task);
void (*onWorkerStart)(ProcessPool *pool, Worker *worker);
void (*onMessage)(ProcessPool *pool, RecvData *msg);
void (*onWorkerExit)(ProcessPool *pool, Worker *worker);
void (*onWorkerStop)(ProcessPool *pool, Worker *worker);
void (*onWorkerError)(ProcessPool *pool, Worker *worker, const ExitStatus &exit_status);
void (*onWorkerMessage)(ProcessPool *pool, EventData *msg);
Expand All @@ -277,7 +280,6 @@ struct ProcessPool {
Worker *workers;
std::vector<std::shared_ptr<UnixSocket>> *pipes;
std::unordered_map<pid_t, Worker *> *map_;
Reactor *reactor;
MsgQueue *queue;
StreamInfo *stream_info_;
Channel *message_box = nullptr;
Expand Down
28 changes: 27 additions & 1 deletion src/os/process_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,17 @@ int ProcessPool::start() {
if (start_check() < 0) {
return SW_ERR;
}

if (onStart) {
onStart(this);
}

SW_LOOP_N(worker_num) {
if (spawn(&(workers[i])) < 0) {
return SW_ERR;
}
}

return SW_OK;
}

Expand Down Expand Up @@ -417,9 +423,25 @@ bool ProcessPool::reload() {
}

void ProcessPool::stop(Worker *worker) {
if (async && worker->pipe_worker) {
worker->shutdown();

if (!swoole_event_is_available()) {
return;
}

auto reactor = sw_reactor();
if (worker->pipe_worker) {
swoole_event_del(worker->pipe_worker);
}

if (onWorkerExit) {
reactor->set_end_callback(Reactor::PRIORITY_TRY_EXIT, [&](Reactor *reactor) {
onWorkerExit(this, worker);
if (reactor->if_exit()) {
reactor->running = false;
}
});
}
}

void ProcessPool::shutdown() {
Expand All @@ -428,6 +450,10 @@ void ProcessPool::shutdown() {
Worker *worker;
running = 0;

if (onShutdown) {
onShutdown(this);
}

// concurrent kill
for (i = 0; i < worker_num; i++) {
worker = &workers[i];
Expand Down
2 changes: 1 addition & 1 deletion tests/include/config.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
define('REDIS_SERVER_PWD', getenv('REDIS_SERVER_PWD') ?: 'root');
define('REDIS_SERVER_DB', (int)(getenv('REDIS_SERVER_DB') ?: 0));

if (!getenv('SWOOLE_TEST_NO_DOCKER')) {
if (getenv('SWOOLE_TEST_IN_DOCKER')) {
if (!empty($info = `docker ps 2>&1 | grep httpbin 2>&1`) &&
preg_match('/\s+?[^:]+:(\d+)->\d+\/tcp\s+/', $info, $matches) &&
is_numeric($matches[1])) {
Expand Down
47 changes: 47 additions & 0 deletions tests/swoole_process_pool/master_callback.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
--TEST--
swoole_process_pool: master callback
--SKIPIF--
<?php require __DIR__ . '/../include/skipif.inc';
?>
--FILE--
<?php
require __DIR__ . '/../include/bootstrap.php';

$pool = new Swoole\Process\Pool(1);

$pool->on('workerStart', function (Swoole\Process\Pool $pool, int $workerId) {
echo "worker start\n";
Assert::true($pool->workerRunning);
Assert::eq($pool->workerId, 0);
Assert::eq($pool->workerPid, posix_getpid());
pcntl_signal(SIGTERM, function (){

});
$pool->shutdown();
sleep(20);
echo "worker exit\n";
});

$pool->on('workerStop', function (Swoole\Process\Pool $pool, int $workerId) {
Assert::false($pool->workerRunning);
echo "worker stop\n";
});

$pool->on('start', function (Swoole\Process\Pool $pool) {
Assert::true($pool->running);
echo "start\n";
});

$pool->on('shutdown', function (Swoole\Process\Pool $pool) {
Assert::false($pool->running);
echo "shutdown\n";
});

$pool->start();
?>
--EXPECT--
start
worker start
shutdown
worker exit
worker stop
Loading

0 comments on commit 44c23e2

Please sign in to comment.